This is an automated email from the ASF dual-hosted git repository. hutran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
View the commit online: https://github.com/apache/incubator-gobblin/commit/e41fd943f096792598c213ec016f207a9a8f87fc The following commit(s) were added to refs/heads/master by this push: new e41fd94 [GOBBLIN-962] Refactor RecursiveCopyableDataset. e41fd94 is described below commit e41fd943f096792598c213ec016f207a9a8f87fc Author: Kuai Yu <[email protected]> AuthorDate: Wed Nov 20 14:44:09 2019 -0800 [GOBBLIN-962] Refactor RecursiveCopyableDataset. Closes #2811 from yukuai518/recursive --- .../management/copy/RecursiveCopyableDataset.java | 58 +++++++++++++--------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 2d1f740..f6aaac9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -94,19 +94,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData this.properties = properties; } - @Override - public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration) - throws IOException { - - Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob); - Path targetPath = - new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath)); - - Map<Path, FileStatus> filesInSource = - createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath); - Map<Path, FileStatus> filesInTarget = - createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath); - + protected Collection<? extends CopyEntity> getCopyableFilesImpl(CopyConfiguration configuration, + Map<Path, FileStatus> filesInSource, + Map<Path, FileStatus> filesInTarget, + FileSystem targetFs, + Path replacedPrefix, + Path replacingPrefix, + Path deleteEmptyDirectoriesUpTo) throws IOException { List<Path> toCopy = Lists.newArrayList(); Map<Path, FileStatus> toDelete = Maps.newHashMap(); boolean requiresUpdate = false; @@ -127,7 +121,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData if (!this.update && requiresUpdate) { throw new IOException("Some files need to be copied but they already exist in the destination. " - + "Aborting because not running in update mode."); + + "Aborting because not running in update mode."); } if (this.delete) { @@ -139,13 +133,16 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData for (Path path : toCopy) { FileStatus file = filesInSource.get(path); - Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath); - Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath); + Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), replacedPrefix); + Path thisTargetPath = new Path(replacingPrefix, filePathRelativeToSearchPath); CopyableFile copyableFile = - CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN()) - .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile - .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, - configuration)).build(); + CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration) + .fileSet(datasetURN()) + .datasetOutputPath(thisTargetPath.toString()) + .ancestorsOwnerAndPermission(CopyableFile + .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), + replacedPrefix, configuration)) + .build(); copyableFile.setFsDatasets(this.fs, targetFs); copyableFiles.add(copyableFile); } @@ -153,14 +150,29 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData if (!toDelete.isEmpty()) { CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties, - this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent()); - + this.deleteEmptyDirectories ? Optional.of(deleteEmptyDirectoriesUpTo) : Optional.<Path>absent()); copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1)); } - return copyEntities; } + @Override + public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration) + throws IOException { + + Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob); + Path targetPath = + new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath)); + + Map<Path, FileStatus> filesInSource = + createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath); + Map<Path, FileStatus> filesInTarget = + createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath); + + return getCopyableFilesImpl(configuration, filesInSource, filesInTarget, targetFs, + nonGlobSearchPath, configuration.getPublishDir(), targetPath); + } + @VisibleForTesting protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
