This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c009895c280aa13d3c06896f18d04660841ab902 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 29 17:36:07 2024 -0700 [HUDI-7655] Ensuring clean action executor cleans up all intended files (#11363) --- .../table/action/clean/CleanActionExecutor.java | 6 + .../table/functional/TestCleanActionExecutor.java | 188 +++++++++++++++++++++ 2 files changed, 194 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 83d8cbde4a3..6973d76c5d0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -81,6 +81,12 @@ public class CleanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, boolean deleteResult = fs.delete(deletePath, isDirectory); if (deleteResult) { LOG.debug("Cleaned file at path :" + deletePath); + } else { + if (fs.exists(deletePath)) { + throw new HoodieIOException("Failed to delete path during clean execution " + deletePath); + } else { + LOG.debug("Already cleaned up file at path :" + deletePath); + } } return deleteResult; } catch (FileNotFoundException fio) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java new file mode 100644 index 00000000000..206e243ba17 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanFileInfo; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.clean.CleanActionExecutor; +import org.apache.hudi.table.action.clean.CleanPlanner; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests Clean action executor. + */ +public class TestCleanActionExecutor { + + private static final StorageConfiguration<Configuration> CONF = getDefaultStorageConf(); + private final HoodieEngineContext context = new HoodieLocalEngineContext(CONF); + private final HoodieTable<?, ?, ?, ?> mockHoodieTable = mock(HoodieTable.class); + private HoodieTableMetaClient metaClient; + private FileSystem fs; + + private static String PARTITION1 = "partition1"; + + String earliestInstant = "20231204194919610"; + String earliestInstantMinusThreeDays = "20231201194919610"; + + @BeforeEach + void setUp() { + metaClient = mock(HoodieTableMetaClient.class); + when(mockHoodieTable.getMetaClient()).thenReturn(metaClient); + HoodieTableConfig tableConfig = new HoodieTableConfig(); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + HoodieStorage storage = mock(HoodieStorage.class); + when(metaClient.getStorage()).thenReturn(storage); + when(mockHoodieTable.getStorage()).thenReturn(storage); + fs = mock(FileSystem.class); + when(storage.getFileSystem()).thenReturn(fs); + when(fs.getConf()).thenReturn(CONF.unwrap()); + } + + @ParameterizedTest + @EnumSource(CleanFailureType.class) + void testPartialCleanFailure(CleanFailureType failureType) throws IOException { + HoodieWriteConfig config = getCleanByCommitsConfig(); + String fileGroup = UUID.randomUUID() + "-0"; + HoodieBaseFile baseFile = new HoodieBaseFile(String.format("/tmp/base/%s_1-0-1_%s.parquet", fileGroup, "001")); + FileSystem localFs = new Path(baseFile.getPath()).getFileSystem(CONF.unwrap()); + Path filePath = new Path(baseFile.getPath()); + localFs.create(filePath); + if (failureType == CleanFailureType.TRUE_ON_DELETE) { + when(fs.delete(filePath, false)).thenReturn(true); + } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) { + when(fs.delete(filePath, false)).thenReturn(false); + when(fs.exists(filePath)).thenReturn(false); + } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) { + when(fs.delete(filePath, false)).thenReturn(false); + when(fs.exists(filePath)).thenReturn(true); + } else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) { + when(fs.delete(filePath, false)).thenThrow(new FileNotFoundException("throwing file not found exception")); + } else { + // run time exception + when(fs.delete(filePath, false)).thenThrow(new RuntimeException("throwing run time exception")); + } + + Map<String, List<HoodieCleanFileInfo>> partitionCleanFileInfoMap = new HashMap<>(); + List<HoodieCleanFileInfo> cleanFileInfos = Collections.singletonList(new HoodieCleanFileInfo(baseFile.getPath(), false)); + partitionCleanFileInfoMap.put(PARTITION1, cleanFileInfos); + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(earliestInstant, HoodieTimeline.COMMIT_ACTION, HoodieInstant.State.COMPLETED.name()), earliestInstantMinusThreeDays, + HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(), Collections.emptyMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION, partitionCleanFileInfoMap, Collections.emptyList(), Collections.emptyMap()); + + // add clean to the timeline. + HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class); + when(metaClient.getActiveTimeline()).thenReturn(activeTimeline); + when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline); + HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "002"); + HoodieActiveTimeline cleanTimeline = mock(HoodieActiveTimeline.class); + when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline); + when(cleanTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant)); + when(activeTimeline.getInstantDetails(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); + when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); + + when(mockHoodieTable.getCleanTimeline()).thenReturn(cleanTimeline); + HoodieTimeline inflightsAndRequestedTimeline = mock(HoodieTimeline.class); + when(cleanTimeline.filterInflightsAndRequested()).thenReturn(inflightsAndRequestedTimeline); + when(inflightsAndRequestedTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant)); + when(activeTimeline.transitionCleanRequestedToInflight(any(), any())).thenReturn(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, "002")); + when(mockHoodieTable.getMetadataWriter("002")).thenReturn(Option.empty()); + + CleanActionExecutor cleanActionExecutor = new CleanActionExecutor(context, config, mockHoodieTable, "002"); + if (failureType == CleanFailureType.TRUE_ON_DELETE) { + assertCleanExecutionSuccess(cleanActionExecutor, filePath); + } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) { + assertCleanExecutionSuccess(cleanActionExecutor, filePath); + } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) { + assertCleanExecutionFailure(cleanActionExecutor); + } else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) { + assertCleanExecutionSuccess(cleanActionExecutor, filePath); + } else { + // run time exception + assertCleanExecutionFailure(cleanActionExecutor); + } + } + + private void assertCleanExecutionFailure(CleanActionExecutor cleanActionExecutor) { + assertThrows(HoodieException.class, () -> { + cleanActionExecutor.execute(); + }); + } + + private void assertCleanExecutionSuccess(CleanActionExecutor cleanActionExecutor, Path filePath) { + HoodieCleanMetadata cleanMetadata = cleanActionExecutor.execute(); + assertTrue(cleanMetadata.getPartitionMetadata().containsKey(PARTITION1)); + HoodieCleanPartitionMetadata cleanPartitionMetadata = cleanMetadata.getPartitionMetadata().get(PARTITION1); + assertTrue(cleanPartitionMetadata.getDeletePathPatterns().contains(filePath.getName())); + } + + private static HoodieWriteConfig getCleanByCommitsConfig() { + return HoodieWriteConfig.newBuilder().withPath("/tmp") + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + } + + enum CleanFailureType { + TRUE_ON_DELETE, + FALSE_ON_DELETE_IS_EXISTS_FALSE, + FALSE_ON_DELETE_IS_EXISTS_TRUE, + FILE_NOT_FOUND_EXC_ON_DELETE, + RUNTIME_EXC_ON_DELETE + } +}
