This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 322d647 [GOBBLIN-1158] Use input dir to document old files instead of file pathes to reduce … 322d647 is described below commit 322d6472e84f06577e2b5949c1200cf5a3d45002 Author: Zihan Li <zi...@zihli-mn1.linkedin.biz> AuthorDate: Wed May 27 12:37:47 2020 -0700 [GOBBLIN-1158] Use input dir to document old files instead of file pathes to reduce … Closes #2997 from ZihanLi58/COMPACTIONACTION --- .../compaction/action/CompactionCompleteFileOperationAction.java | 7 +------ .../gobblin/compaction/mapreduce/CompactionJobConfigurator.java | 6 +----- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index 8f43c44..252f118 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -125,12 +125,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete newTotalRecords = this.configurator.getFileNameRecordCount(); } else { this.configurator.getOldFiles() - .addAll( - DatasetHelper.getApplicableFilePaths(this.fs, dstPath, Arrays.asList(configurator.getFileExtension())) - .stream() - .filter(Objects::nonNull) - .map(Path::toString) - .collect(Collectors.toList())); + .add(this.fs.makeQualified(dstPath).toString()); this.fs.delete(dstPath, true); FsPermission permission = HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION, diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java index ef3e3a0..8e492c3 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java @@ -259,11 +259,7 @@ public abstract class CompactionJobConfigurator { } this.oldFiles = new HashSet<>(); for (Path path : mapReduceInputPaths) { - oldFiles.addAll(DatasetHelper.getApplicableFilePaths(this.fs, path, Arrays.asList(getFileExtension())) - .stream() - .filter(Objects::nonNull) - .map(Path::toString) - .collect(Collectors.toList())); + oldFiles.add(this.fs.makeQualified(path).toString()); FileInputFormat.addInputPath(job, path); }