This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new bf278212f81 [HUDI-7655] Ensuring clean action executor cleans up all
intended files (#11363)
bf278212f81 is described below
commit bf278212f817cc55a9301e52c9eac01926d9f56b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 29 17:36:07 2024 -0700
[HUDI-7655] Ensuring clean action executor cleans up all intended files
(#11363)
---
.../table/action/clean/CleanActionExecutor.java | 6 +
.../table/functional/TestCleanActionExecutor.java | 188 +++++++++++++++++++++
2 files changed, 194 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 83d8cbde4a3..6973d76c5d0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -81,6 +81,12 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
boolean deleteResult = fs.delete(deletePath, isDirectory);
if (deleteResult) {
LOG.debug("Cleaned file at path :" + deletePath);
+ } else {
+ if (fs.exists(deletePath)) {
+ throw new HoodieIOException("Failed to delete path during clean
execution " + deletePath);
+ } else {
+ LOG.debug("Already cleaned up file at path :" + deletePath);
+ }
}
return deleteResult;
} catch (FileNotFoundException fio) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
new file mode 100644
index 00000000000..206e243ba17
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests Clean action executor.
+ */
+public class TestCleanActionExecutor {
+
+ private static final StorageConfiguration<Configuration> CONF =
getDefaultStorageConf();
+ private final HoodieEngineContext context = new
HoodieLocalEngineContext(CONF);
+ private final HoodieTable<?, ?, ?, ?> mockHoodieTable =
mock(HoodieTable.class);
+ private HoodieTableMetaClient metaClient;
+ private FileSystem fs;
+
+ private static String PARTITION1 = "partition1";
+
+ String earliestInstant = "20231204194919610";
+ String earliestInstantMinusThreeDays = "20231201194919610";
+
+ @BeforeEach
+ void setUp() {
+ metaClient = mock(HoodieTableMetaClient.class);
+ when(mockHoodieTable.getMetaClient()).thenReturn(metaClient);
+ HoodieTableConfig tableConfig = new HoodieTableConfig();
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ HoodieStorage storage = mock(HoodieStorage.class);
+ when(metaClient.getStorage()).thenReturn(storage);
+ when(mockHoodieTable.getStorage()).thenReturn(storage);
+ fs = mock(FileSystem.class);
+ when(storage.getFileSystem()).thenReturn(fs);
+ when(fs.getConf()).thenReturn(CONF.unwrap());
+ }
+
+ @ParameterizedTest
+ @EnumSource(CleanFailureType.class)
+ void testPartialCleanFailure(CleanFailureType failureType) throws
IOException {
+ HoodieWriteConfig config = getCleanByCommitsConfig();
+ String fileGroup = UUID.randomUUID() + "-0";
+ HoodieBaseFile baseFile = new
HoodieBaseFile(String.format("/tmp/base/%s_1-0-1_%s.parquet", fileGroup,
"001"));
+ FileSystem localFs = new
Path(baseFile.getPath()).getFileSystem(CONF.unwrap());
+ Path filePath = new Path(baseFile.getPath());
+ localFs.create(filePath);
+ if (failureType == CleanFailureType.TRUE_ON_DELETE) {
+ when(fs.delete(filePath, false)).thenReturn(true);
+ } else if (failureType ==
CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) {
+ when(fs.delete(filePath, false)).thenReturn(false);
+ when(fs.exists(filePath)).thenReturn(false);
+ } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE)
{
+ when(fs.delete(filePath, false)).thenReturn(false);
+ when(fs.exists(filePath)).thenReturn(true);
+ } else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) {
+ when(fs.delete(filePath, false)).thenThrow(new
FileNotFoundException("throwing file not found exception"));
+ } else {
+ // run time exception
+ when(fs.delete(filePath, false)).thenThrow(new
RuntimeException("throwing run time exception"));
+ }
+
+ Map<String, List<HoodieCleanFileInfo>> partitionCleanFileInfoMap = new
HashMap<>();
+ List<HoodieCleanFileInfo> cleanFileInfos = Collections.singletonList(new
HoodieCleanFileInfo(baseFile.getPath(), false));
+ partitionCleanFileInfoMap.put(PARTITION1, cleanFileInfos);
+ HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant(earliestInstant, HoodieTimeline.COMMIT_ACTION,
HoodieInstant.State.COMPLETED.name()), earliestInstantMinusThreeDays,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
Collections.emptyMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
partitionCleanFileInfoMap, Collections.emptyList(), Collections.emptyMap());
+
+ // add clean to the timeline.
+ HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
+ when(metaClient.getActiveTimeline()).thenReturn(activeTimeline);
+ when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
+ HoodieInstant cleanInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
"002");
+ HoodieActiveTimeline cleanTimeline = mock(HoodieActiveTimeline.class);
+ when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
+
when(cleanTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant));
+
when(activeTimeline.getInstantDetails(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+
when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+
+ when(mockHoodieTable.getCleanTimeline()).thenReturn(cleanTimeline);
+ HoodieTimeline inflightsAndRequestedTimeline = mock(HoodieTimeline.class);
+
when(cleanTimeline.filterInflightsAndRequested()).thenReturn(inflightsAndRequestedTimeline);
+
when(inflightsAndRequestedTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant));
+ when(activeTimeline.transitionCleanRequestedToInflight(any(),
any())).thenReturn(new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLEAN_ACTION, "002"));
+ when(mockHoodieTable.getMetadataWriter("002")).thenReturn(Option.empty());
+
+ CleanActionExecutor cleanActionExecutor = new CleanActionExecutor(context,
config, mockHoodieTable, "002");
+ if (failureType == CleanFailureType.TRUE_ON_DELETE) {
+ assertCleanExecutionSuccess(cleanActionExecutor, filePath);
+ } else if (failureType ==
CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) {
+ assertCleanExecutionSuccess(cleanActionExecutor, filePath);
+ } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE)
{
+ assertCleanExecutionFailure(cleanActionExecutor);
+ } else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) {
+ assertCleanExecutionSuccess(cleanActionExecutor, filePath);
+ } else {
+ // run time exception
+ assertCleanExecutionFailure(cleanActionExecutor);
+ }
+ }
+
+ private void assertCleanExecutionFailure(CleanActionExecutor
cleanActionExecutor) {
+ assertThrows(HoodieException.class, () -> {
+ cleanActionExecutor.execute();
+ });
+ }
+
+ private void assertCleanExecutionSuccess(CleanActionExecutor
cleanActionExecutor, Path filePath) {
+ HoodieCleanMetadata cleanMetadata = cleanActionExecutor.execute();
+ assertTrue(cleanMetadata.getPartitionMetadata().containsKey(PARTITION1));
+ HoodieCleanPartitionMetadata cleanPartitionMetadata =
cleanMetadata.getPartitionMetadata().get(PARTITION1);
+
assertTrue(cleanPartitionMetadata.getDeletePathPatterns().contains(filePath.getName()));
+ }
+
+ private static HoodieWriteConfig getCleanByCommitsConfig() {
+ return HoodieWriteConfig.newBuilder().withPath("/tmp")
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+ }
+
+ enum CleanFailureType {
+ TRUE_ON_DELETE,
+ FALSE_ON_DELETE_IS_EXISTS_FALSE,
+ FALSE_ON_DELETE_IS_EXISTS_TRUE,
+ FILE_NOT_FOUND_EXC_ON_DELETE,
+ RUNTIME_EXC_ON_DELETE
+ }
+}