This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b4c3ad69a912ee2cae2e0a744fbd5f327358c44b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Apr 4 04:22:13 2025 -0700

    [HUDI-9259] Fixing marker reconciliation for failures during deleting 
additional files (#13088)
    
    * Fixing marker reconciliation for failures during deleting additional files
    
    * Addressing feedback
    
    (cherry picked from commit 87980f0b36be3185a8da6c4f33d0831dadfd2096)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  18 ++-
 .../apache/hudi/table/TestHoodieSparkTable.java    | 136 +++++++++++++++++++++
 .../hudi/table/marker/TestDirectWriteMarkers.java  |  58 +++++++++
 .../hudi/table/marker/TestWriteMarkersBase.java    |  10 ++
 4 files changed, 217 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index e8c1bbe981e..19a5f63381b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -95,6 +95,7 @@ import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -739,7 +740,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @throws HoodieIOException if some paths can't be finalized on storage
    */
   public void finalizeWrite(HoodieEngineContext context, String instantTs, 
List<HoodieWriteStat> stats) throws HoodieIOException {
-    reconcileAgainstMarkers(context, instantTs, stats, 
config.getConsistencyGuardConfig().isConsistencyCheckEnabled(), 
config.shouldFailOnDuplicateDataFileDetection());
+    reconcileAgainstMarkers(context, instantTs, stats, 
config.getConsistencyGuardConfig().isConsistencyCheckEnabled(), 
config.shouldFailOnDuplicateDataFileDetection(),
+        WriteMarkersFactory.get(config.getMarkersType(), this, instantTs));
   }
 
   private void deleteInvalidFilesByPartitions(HoodieEngineContext context, 
Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
@@ -753,7 +755,13 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
           LOG.info("Deleting invalid data file=" + partitionFilePair);
           // Delete
           try {
-            storage.deleteFile(new StoragePath(partitionFilePair.getValue()));
+            StoragePath pathToDelete = new 
StoragePath(partitionFilePair.getValue());
+            boolean deletionSuccess = storage.deleteFile(pathToDelete);
+            if (!deletionSuccess && storage.exists(pathToDelete)) {
+              throw new HoodieIOException("Failed to delete invalid path 
during marker reconciliaton " + pathToDelete);
+            }
+          } catch (FileNotFoundException fnfe) {
+            // no op
           } catch (IOException e) {
             throw new HoodieIOException(e.getMessage(), e);
           }
@@ -778,16 +786,16 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param consistencyCheckEnabled Consistency Check Enabled
    * @throws HoodieIOException
    */
-  protected void reconcileAgainstMarkers(HoodieEngineContext context,
+  void reconcileAgainstMarkers(HoodieEngineContext context,
                                          String instantTs,
                                          List<HoodieWriteStat> stats,
                                          boolean consistencyCheckEnabled,
-                                         boolean 
shouldFailOnDuplicateDataFileDetection) throws HoodieIOException {
+                                         boolean 
shouldFailOnDuplicateDataFileDetection,
+                               WriteMarkers markers) throws HoodieIOException {
     try {
       // Reconcile marker and data files with WriteStats so that partially 
written data-files due to failed
       // (but succeeded on retry) tasks are removed.
       String basePath = getMetaClient().getBasePath().toString();
-      WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
this, instantTs);
 
       if (!markers.doesMarkerDirExist()) {
         // can happen if it was an empty write say.
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
new file mode 100644
index 00000000000..6cc1d42a2a6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.marker.WriteMarkers;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieSparkTable extends HoodieCommonTestHarness {
+
+  private static final StorageConfiguration<?> CONF = getDefaultStorageConf();
+
+  @ParameterizedTest
+  @EnumSource(DeleteFailureType.class)
+  public void testDeleteFailureDuringMarkerReconciliation(DeleteFailureType 
failureType) throws IOException {
+    initPath();
+    HoodieStorage localStorage = HoodieStorageUtils.getStorage(basePath, CONF);
+    WriteMarkers writeMarkers = mock(WriteMarkers.class);
+    String partitionPath = "p1";
+    List<String> datafiles = Arrays.asList("file1", "file2", "file3");
+    List<org.apache.hudi.common.model.HoodieWriteStat> writeStatList = new 
ArrayList<>();
+    Set<String> markerList = new HashSet<>();
+    datafiles.forEach(fileName -> {
+      org.apache.hudi.common.model.HoodieWriteStat writeStat = new 
org.apache.hudi.common.model.HoodieWriteStat();
+      writeStat.setPath(partitionPath + "/" + fileName);
+      writeStatList.add(writeStat);
+      markerList.add(partitionPath + "/" + fileName);
+    });
+
+    // add 2 additional entries to markers. and create the resp data file. 
These two files are expected to be deleted during reconciliation.
+    List<String> additionalFiles = Arrays.asList("file4", "file5");
+    additionalFiles.forEach(fileName -> {
+      markerList.add(partitionPath + "/" + fileName);
+    });
+
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(basePath).withMarkersType(MarkerType.DIRECT.name()).build();
+    when(writeMarkers.doesMarkerDirExist()).thenReturn(true);
+    when(writeMarkers.createdAndMergedDataPaths(getEngineContext(), 
writeConfig.getFinalizeWriteParallelism())).thenReturn(markerList);
+
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath(basePath));
+    when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    HoodieStorage storage = mock(HoodieStorage.class);
+    when(metaClient.getStorage()).thenReturn(storage);
+
+    additionalFiles.forEach(fileName -> {
+      try {
+        StoragePath storagePath = new StoragePath(basePath + "/" + 
partitionPath + "/" + fileName);
+        if (failureType == DeleteFailureType.TRUE_ON_DELETE) {
+          when(storage.deleteFile(storagePath)).thenReturn(true);
+        } else if (failureType == 
DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) {
+          when(storage.deleteFile(storagePath)).thenReturn(false);
+          when(storage.exists(storagePath)).thenReturn(false);
+        } else if (failureType == 
DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) {
+          when(storage.deleteFile(storagePath)).thenReturn(false);
+          when(storage.exists(storagePath)).thenReturn(true);
+        } else if (failureType == 
DeleteFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) {
+          when(storage.deleteFile(storagePath)).thenThrow(new 
FileNotFoundException("throwing file not found exception"));
+        } else {
+          // run time exception
+          when(storage.deleteFile(storagePath)).thenThrow(new 
RuntimeException("throwing run time exception"));
+        }
+        // lets create the data file. so that we can validate later.
+        localStorage.create(storagePath);
+      } catch (IOException e) {
+        throw new HoodieException("Failed to check data file existance " + 
fileName);
+      }
+    });
+    HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, 
getEngineContext(), metaClient);
+    if (failureType == DeleteFailureType.RUNTIME_EXC_ON_DELETE || failureType 
== DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) {
+      assertThrows(HoodieException.class, () -> {
+        hoodieTable.reconcileAgainstMarkers(getEngineContext(), "0001", 
writeStatList, false, false, writeMarkers);
+      });
+    } else { // all other cases
+      hoodieTable.reconcileAgainstMarkers(getEngineContext(), "0001", 
writeStatList, false, false, writeMarkers);
+      // validate that additional files are deleted from storage
+      additionalFiles.forEach(fileName -> {
+        try {
+          verify(storage, times(1)).deleteFile(new StoragePath(basePath + "/" 
+ partitionPath + "/" + fileName));
+        } catch (IOException e) {
+          throw new HoodieException("Failed to validate that file exists " + 
fileName);
+        }
+      });
+    }
+  }
+
+  enum DeleteFailureType {
+    TRUE_ON_DELETE,
+    FALSE_ON_DELETE_IS_EXISTS_FALSE,
+    FALSE_ON_DELETE_IS_EXISTS_TRUE,
+    FILE_NOT_FOUND_EXC_ON_DELETE,
+    RUNTIME_EXC_ON_DELETE
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
index 99c2068b6bc..9e1ef3e97ec 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java
@@ -19,21 +19,30 @@
 package org.apache.hudi.table.marker;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestDirectWriteMarkers extends TestWriteMarkersBase {
 
@@ -69,4 +78,53 @@ public class TestDirectWriteMarkers extends 
TestWriteMarkersBase {
         markerFiles.stream().map(m -> 
m.getPath().toString()).collect(Collectors.toList())
     );
   }
+
+  @Test
+  public void testMarkerReconciliation() throws IOException {
+    // create couple of files which exists only in markers, but not on storage.
+    initMetaClient();
+
+    // create marker files
+    createSomeMarkers(true);
+    // add 2 data files, out of which 1 is expected to be deleted during 
reconciliation.
+    String fileName1 = "file5.parquet";
+    String partitionPathToTest = "2020/06/01";
+    StoragePath dataFile1 = createDataFile("2020/06/01", fileName1);
+    writeMarkers.create("2020/06/01", fileName1, IOType.CREATE);
+
+    String fileName2 = "file6.parquet";
+    StoragePath dataFile2 = createDataFile("2020/06/01", fileName2);
+    writeMarkers.create("2020/06/01", fileName2, IOType.CREATE);
+
+    // create HoodieWriteStats
+    List<String> expectedMarkerPaths = new 
ArrayList<>(getRelativeMarkerPathList(true));
+    List<String> expectedDataPaths = new 
ArrayList<>(expectedMarkerPaths.stream().map(entry ->
+        entry.substring(0, 
entry.indexOf(".marker"))).collect(Collectors.toList()));
+    // only add file1 and skip file2. Hence we expect file2 to be deleted 
during reconciliation.
+    expectedDataPaths.add(partitionPathToTest + "/" + fileName1);
+
+    List<HoodieWriteStat> writeStatList = new ArrayList<>();
+    expectedDataPaths.forEach(entry -> {
+      String fullPath = entry;
+      String fileName = fullPath.substring(fullPath.lastIndexOf("/") + 1);
+      String partitionPath = fullPath.substring(0, fullPath.lastIndexOf("/"));
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPath(partitionPath + "/" + fileName);
+      writeStatList.add(writeStat);
+    });
+
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(metaClient.getBasePath()).withMarkersType(MarkerType.DIRECT.name()).build();
+
+    HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, context, 
metaClient);
+    hoodieTable.finalizeWrite(context, "000", writeStatList); // data file 2 
should have been deleted.
+    assertTrue(storage.exists(dataFile1));
+    // file 2 is expected to be deleted.
+    assertTrue(!storage.exists(dataFile2));
+  }
+
+  @Test
+  public void testFailureToDeleteDuringReconciliation() {
+
+  }
+
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java
index 857cf91ba15..6961df171ac 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java
@@ -86,6 +86,16 @@ public abstract class TestWriteMarkersBase extends 
HoodieCommonTestHarness {
     }
   }
 
+  protected StoragePath createDataFile(String partitionPath, String 
datafileName) {
+    StoragePath path = FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath + "/" + datafileName);
+    try {
+      storage.create(path, false).close();
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create data file " + path, e);
+    }
+    return path;
+  }
+
   abstract void verifyMarkersInFileSystem(boolean isTablePartitioned) throws 
IOException;
 
   @ParameterizedTest

Reply via email to