This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 322d647  [GOBBLIN-1158] Use input dir to document old files instead of 
file pathes to reduce …
322d647 is described below

commit 322d6472e84f06577e2b5949c1200cf5a3d45002
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed May 27 12:37:47 2020 -0700

    [GOBBLIN-1158] Use input dir to document old files instead of file pathes 
to reduce …
    
    Closes #2997 from ZihanLi58/COMPACTIONACTION
---
 .../compaction/action/CompactionCompleteFileOperationAction.java   | 7 +------
 .../gobblin/compaction/mapreduce/CompactionJobConfigurator.java    | 6 +-----
 2 files changed, 2 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 8f43c44..252f118 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -125,12 +125,7 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
         this.configurator.getOldFiles()
-            .addAll(
-                DatasetHelper.getApplicableFilePaths(this.fs, dstPath, 
Arrays.asList(configurator.getFileExtension()))
-                    .stream()
-                    .filter(Objects::nonNull)
-                    .map(Path::toString)
-                    .collect(Collectors.toList()));
+            .add(this.fs.makeQualified(dstPath).toString());
         this.fs.delete(dstPath, true);
         FsPermission permission =
             HadoopUtils.deserializeFsPermission(this.state, 
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index ef3e3a0..8e492c3 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -259,11 +259,7 @@ public abstract class CompactionJobConfigurator {
     }
     this.oldFiles = new HashSet<>();
     for (Path path : mapReduceInputPaths) {
-      oldFiles.addAll(DatasetHelper.getApplicableFilePaths(this.fs, path, 
Arrays.asList(getFileExtension()))
-          .stream()
-          .filter(Objects::nonNull)
-          .map(Path::toString)
-          .collect(Collectors.toList()));
+      oldFiles.add(this.fs.makeQualified(path).toString());
       FileInputFormat.addInputPath(job, path);
     }
 

Reply via email to