This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 07a804880 [GOBBLIN-2105] Ensure the destination path does not exist 
before renaming during Gobblin compaction (#3993)
07a804880 is described below

commit 07a804880aad6c76e69d9d63b8f4a321240992bd
Author: Aditya Sharma <[email protected]>
AuthorDate: Thu Jul 11 20:20:49 2024 +0530

    [GOBBLIN-2105] Ensure the destination path does not exist before renaming 
during Gobblin compaction (#3993)
---
 .../CompactionCompleteFileOperationAction.java     |   6 +-
 .../CompactionCompleteFileOperationActionTest.java | 138 +++++++++++++++++++++
 2 files changed, 142 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index f0f900d0c..0637446ed 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -136,11 +136,13 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
           dstPath =
               PathUtils.mergePaths(dstPath, new 
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
           
this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
-          //Write to a new path, no need to delete the old path
         } else {
           
this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
-          this.fs.delete(dstPath, true);
         }
+
+        // It is possible that the destination path is a non-empty directory 
if the previous run failed.
+        // Hence, always delete the destination path before moving the tmp 
path to the destination path.
+        this.fs.delete(dstPath, true);
         FsPermission permission =
             HadoopUtils.deserializeFsPermission(this.state, 
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                 FsPermission.getDefault());
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationActionTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationActionTest.java
new file mode 100644
index 000000000..e86c95f4c
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationActionTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.joda.time.DateTimeUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+
+/**
+ * Unit tests for the {@link 
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction} 
class.
+ */
+public class CompactionCompleteFileOperationActionTest {
+
+  /**
+   * Test the deletion invocation under various conditions during 
CompactionCompleteFileOperationAction.
+   * <ul>
+   *   <li>For recompaction writing to a new directory, ensure the directory 
is deleted if it exists.</li>
+   *   <li>For recompaction overwriting the previous directory, ensure the 
directory is deleted before renaming.</li>
+   *   <li>In append mode, ensure that deletion is never explicitly 
performed.</li>
+   * </ul>
+   * @throws IOException
+   */
+  @Test
+  public void testDeletionDuringCompaction()
+      throws IOException {
+
+    // Initialize mocks
+    final WorkUnitState wus = Mockito.spy(new WorkUnitState());
+    final CompactionJobConfigurator cjc = 
Mockito.mock(CompactionOrcJobConfigurator.class);
+    final InputRecordCountHelper irch = 
Mockito.mock(InputRecordCountHelper.class);
+    final EventSubmitter es = Mockito.mock(EventSubmitter.class);
+    final FileSystem fs = Mockito.mock(FileSystem.class);
+    final FileSystemDataset fsd = Mockito.mock(FileSystemDataset.class);
+    final Job job = Mockito.mock(Job.class);
+    final FileStatus fileStatus = Mockito.mock(FileStatus.class);
+    final Counters jobCounters = Mockito.mock(Counters.class);
+
+    // Initialize variables
+    final Path tmpFile = new Path("/tmp/somePath/someFile.orc");
+    final Path oldCompactionPath = new 
Path("/base/datasetName/daily/2024/01/01/compaction_1");
+    final Path newCompactionPath = new 
Path("/base/datasetName/daily/2024/01/01/compaction_2");
+    final String hourlyInputPathStr = "/base/datasetName/hourly/2024/01/01";
+    final Path dailyOutputPath = new 
Path("/base/datasetName/daily/2024/01/01");
+    final Counter recordCounter = new GenericCounter();
+    recordCounter.setValue(99);
+    final JobID jobId = new JobID("someId", 12345);
+
+    // Configure Mocks
+    Mockito.doReturn(job).when(cjc).getConfiguredJob();
+    Mockito.doReturn(false).when(fsd).isVirtual();
+    Mockito.doReturn(true).when(cjc).isJobCreated();
+    Mockito.doReturn(tmpFile).when(cjc).getMrOutputPath();
+    Mockito.doReturn(100L).when(irch).readRecordCount(Mockito.any());
+    Mockito.doReturn(1L).when(irch).readExecutionCount(Mockito.any());
+    Mockito.doReturn("orc").when(cjc).getFileExtension();
+    Mockito.doReturn(true).when(fs).exists(Mockito.any());
+    Mockito.doReturn(fileStatus).when(fs).getFileStatus(tmpFile);
+    Mockito.doReturn(true).when(fs).rename(Mockito.any(Path.class), 
Mockito.any(Path.class));
+    Mockito.doReturn(false).when(fileStatus).isDirectory();
+    Mockito.doReturn(tmpFile).when(fileStatus).getPath();
+    
Mockito.doReturn(oldCompactionPath).when(fs).makeQualified(oldCompactionPath);
+    
Mockito.doReturn(newCompactionPath).when(fs).makeQualified(newCompactionPath);
+    Mockito.doReturn(dailyOutputPath).when(fs).makeQualified(dailyOutputPath);
+    Mockito.doReturn(jobCounters).when(job).getCounters();
+    Mockito.doReturn(jobId).when(job).getJobID();
+    
Mockito.doReturn(recordCounter).when(jobCounters).findCounter(Mockito.any());
+    Mockito.doReturn(new State()).when(irch).loadState(Mockito.any());
+    Mockito.doReturn(hourlyInputPathStr).when(fsd).datasetURN();
+
+    // Configure WorkUnitState
+    wus.setProp(MRCompactor.COMPACTION_INPUT_DIR, "/base");
+    wus.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, "hourly");
+    wus.setProp(MRCompactor.COMPACTION_DEST_DIR, "/base");
+    wus.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, "daily");
+    wus.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, "daily");
+    wus.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, "daily");
+    wus.setProp(CompactionSource.COMPACTION_INIT_TIME, 
DateTimeUtils.currentTimeMillis());
+
+    CompactionCompleteFileOperationAction 
compactionCompleteFileOperationAction =
+        new CompactionCompleteFileOperationAction(wus, cjc, irch, es, fs);
+
+    // When recompaction should write to a fresh directory
+    wus.setProp(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, false);
+    wus.setProp(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, true);
+    compactionCompleteFileOperationAction.onCompactionJobComplete(fsd);
+    Mockito.verify(fs, Mockito.times(1)).delete(newCompactionPath, true);
+    Mockito.clearInvocations(fs);
+
+    // When recompaction should write overwrite to the directory
+    wus.setProp(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, false);
+    wus.setProp(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false);
+    compactionCompleteFileOperationAction.onCompactionJobComplete(fsd);
+    Mockito.verify(fs, Mockito.times(1)).delete(dailyOutputPath, true);
+    Mockito.clearInvocations(fs);
+
+    // When compaction should append
+    wus.setProp(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, true);
+    compactionCompleteFileOperationAction.onCompactionJobComplete(fsd);
+    Mockito.verify(fs, Mockito.never()).delete(newCompactionPath, true);
+  }
+}

Reply via email to