This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b4c3ad69a912ee2cae2e0a744fbd5f327358c44b Author: Sivabalan Narayanan <[email protected]> AuthorDate: Fri Apr 4 04:22:13 2025 -0700 [HUDI-9259] Fixing marker reconciliation for failures during deleting additional files (#13088) * Fixing marker reconciliation for failures during deleting additional files * Addressing feedback (cherry picked from commit 87980f0b36be3185a8da6c4f33d0831dadfd2096) --- .../java/org/apache/hudi/table/HoodieTable.java | 18 ++- .../apache/hudi/table/TestHoodieSparkTable.java | 136 +++++++++++++++++++++ .../hudi/table/marker/TestDirectWriteMarkers.java | 58 +++++++++ .../hudi/table/marker/TestWriteMarkersBase.java | 10 ++ 4 files changed, 217 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index e8c1bbe981e..19a5f63381b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -95,6 +95,7 @@ import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -739,7 +740,8 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @throws HoodieIOException if some paths can't be finalized on storage */ public void finalizeWrite(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException { - reconcileAgainstMarkers(context, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled(), config.shouldFailOnDuplicateDataFileDetection()); + reconcileAgainstMarkers(context, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled(), config.shouldFailOnDuplicateDataFileDetection(), + WriteMarkersFactory.get(config.getMarkersType(), this, instantTs)); } private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) { @@ -753,7 +755,13 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { LOG.info("Deleting invalid data file=" + partitionFilePair); // Delete try { - storage.deleteFile(new StoragePath(partitionFilePair.getValue())); + StoragePath pathToDelete = new StoragePath(partitionFilePair.getValue()); + boolean deletionSuccess = storage.deleteFile(pathToDelete); + if (!deletionSuccess && storage.exists(pathToDelete)) { + throw new HoodieIOException("Failed to delete invalid path during marker reconciliaton " + pathToDelete); + } + } catch (FileNotFoundException fnfe) { + // no op } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -778,16 +786,16 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @param consistencyCheckEnabled Consistency Check Enabled * @throws HoodieIOException */ - protected void reconcileAgainstMarkers(HoodieEngineContext context, + void reconcileAgainstMarkers(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats, boolean consistencyCheckEnabled, - boolean shouldFailOnDuplicateDataFileDetection) throws HoodieIOException { + boolean shouldFailOnDuplicateDataFileDetection, + WriteMarkers markers) throws HoodieIOException { try { // Reconcile marker and data files with WriteStats so that partially written data-files due to failed // (but succeeded on retry) tasks are removed. String basePath = getMetaClient().getBasePath().toString(); - WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), this, instantTs); if (!markers.doesMarkerDirExist()) { // can happen if it was an empty write say. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java new file mode 100644 index 00000000000..6cc1d42a2a6 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.marker.WriteMarkers; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestHoodieSparkTable extends HoodieCommonTestHarness { + + private static final StorageConfiguration<?> CONF = getDefaultStorageConf(); + + @ParameterizedTest + @EnumSource(DeleteFailureType.class) + public void testDeleteFailureDuringMarkerReconciliation(DeleteFailureType failureType) throws IOException { + initPath(); + HoodieStorage localStorage = HoodieStorageUtils.getStorage(basePath, CONF); + WriteMarkers writeMarkers = mock(WriteMarkers.class); + String partitionPath = "p1"; + List<String> datafiles = Arrays.asList("file1", "file2", "file3"); + List<org.apache.hudi.common.model.HoodieWriteStat> writeStatList = new ArrayList<>(); + Set<String> markerList = new HashSet<>(); + datafiles.forEach(fileName -> { + org.apache.hudi.common.model.HoodieWriteStat writeStat = new org.apache.hudi.common.model.HoodieWriteStat(); + writeStat.setPath(partitionPath + "/" + fileName); + writeStatList.add(writeStat); + markerList.add(partitionPath + "/" + fileName); + }); + + // add 2 additional entries to markers. and create the resp data file. These two files are expected to be deleted during reconciliation. + List<String> additionalFiles = Arrays.asList("file4", "file5"); + additionalFiles.forEach(fileName -> { + markerList.add(partitionPath + "/" + fileName); + }); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath).withMarkersType(MarkerType.DIRECT.name()).build(); + when(writeMarkers.doesMarkerDirExist()).thenReturn(true); + when(writeMarkers.createdAndMergedDataPaths(getEngineContext(), writeConfig.getFinalizeWriteParallelism())).thenReturn(markerList); + + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + when(metaClient.getBasePath()).thenReturn(new StoragePath(basePath)); + when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + HoodieStorage storage = mock(HoodieStorage.class); + when(metaClient.getStorage()).thenReturn(storage); + + additionalFiles.forEach(fileName -> { + try { + StoragePath storagePath = new StoragePath(basePath + "/" + partitionPath + "/" + fileName); + if (failureType == DeleteFailureType.TRUE_ON_DELETE) { + when(storage.deleteFile(storagePath)).thenReturn(true); + } else if (failureType == DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) { + when(storage.deleteFile(storagePath)).thenReturn(false); + when(storage.exists(storagePath)).thenReturn(false); + } else if (failureType == DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) { + when(storage.deleteFile(storagePath)).thenReturn(false); + when(storage.exists(storagePath)).thenReturn(true); + } else if (failureType == DeleteFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) { + when(storage.deleteFile(storagePath)).thenThrow(new FileNotFoundException("throwing file not found exception")); + } else { + // run time exception + when(storage.deleteFile(storagePath)).thenThrow(new RuntimeException("throwing run time exception")); + } + // lets create the data file. so that we can validate later. + localStorage.create(storagePath); + } catch (IOException e) { + throw new HoodieException("Failed to check data file existance " + fileName); + } + }); + HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, getEngineContext(), metaClient); + if (failureType == DeleteFailureType.RUNTIME_EXC_ON_DELETE || failureType == DeleteFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) { + assertThrows(HoodieException.class, () -> { + hoodieTable.reconcileAgainstMarkers(getEngineContext(), "0001", writeStatList, false, false, writeMarkers); + }); + } else { // all other cases + hoodieTable.reconcileAgainstMarkers(getEngineContext(), "0001", writeStatList, false, false, writeMarkers); + // validate that additional files are deleted from storage + additionalFiles.forEach(fileName -> { + try { + verify(storage, times(1)).deleteFile(new StoragePath(basePath + "/" + partitionPath + "/" + fileName)); + } catch (IOException e) { + throw new HoodieException("Failed to validate that file exists " + fileName); + } + }); + } + } + + enum DeleteFailureType { + TRUE_ON_DELETE, + FALSE_ON_DELETE_IS_EXISTS_FALSE, + FALSE_ON_DELETE_IS_EXISTS_TRUE, + FILE_NOT_FOUND_EXC_ON_DELETE, + RUNTIME_EXC_ON_DELETE + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java index 99c2068b6bc..9e1ef3e97ec 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java @@ -19,21 +19,30 @@ package org.apache.hudi.table.marker; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestDirectWriteMarkers extends TestWriteMarkersBase { @@ -69,4 +78,53 @@ public class TestDirectWriteMarkers extends TestWriteMarkersBase { markerFiles.stream().map(m -> m.getPath().toString()).collect(Collectors.toList()) ); } + + @Test + public void testMarkerReconciliation() throws IOException { + // create couple of files which exists only in markers, but not on storage. + initMetaClient(); + + // create marker files + createSomeMarkers(true); + // add 2 data files, out of which 1 is expected to be deleted during reconciliation. + String fileName1 = "file5.parquet"; + String partitionPathToTest = "2020/06/01"; + StoragePath dataFile1 = createDataFile("2020/06/01", fileName1); + writeMarkers.create("2020/06/01", fileName1, IOType.CREATE); + + String fileName2 = "file6.parquet"; + StoragePath dataFile2 = createDataFile("2020/06/01", fileName2); + writeMarkers.create("2020/06/01", fileName2, IOType.CREATE); + + // create HoodieWriteStats + List<String> expectedMarkerPaths = new ArrayList<>(getRelativeMarkerPathList(true)); + List<String> expectedDataPaths = new ArrayList<>(expectedMarkerPaths.stream().map(entry -> + entry.substring(0, entry.indexOf(".marker"))).collect(Collectors.toList())); + // only add file1 and skip file2. Hence we expect file2 to be deleted during reconciliation. + expectedDataPaths.add(partitionPathToTest + "/" + fileName1); + + List<HoodieWriteStat> writeStatList = new ArrayList<>(); + expectedDataPaths.forEach(entry -> { + String fullPath = entry; + String fileName = fullPath.substring(fullPath.lastIndexOf("/") + 1); + String partitionPath = fullPath.substring(0, fullPath.lastIndexOf("/")); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPath(partitionPath + "/" + fileName); + writeStatList.add(writeStat); + }); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(metaClient.getBasePath()).withMarkersType(MarkerType.DIRECT.name()).build(); + + HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, context, metaClient); + hoodieTable.finalizeWrite(context, "000", writeStatList); // data file 2 should have been deleted. + assertTrue(storage.exists(dataFile1)); + // file 2 is expected to be deleted. + assertTrue(!storage.exists(dataFile2)); + } + + @Test + public void testFailureToDeleteDuringReconciliation() { + + } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java index 857cf91ba15..6961df171ac 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java @@ -86,6 +86,16 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { } } + protected StoragePath createDataFile(String partitionPath, String datafileName) { + StoragePath path = FSUtils.constructAbsolutePath(metaClient.getBasePath(), partitionPath + "/" + datafileName); + try { + storage.create(path, false).close(); + } catch (IOException e) { + throw new HoodieException("Failed to create data file " + path, e); + } + return path; + } + abstract void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException; @ParameterizedTest
