Repository: incubator-gobblin Updated Branches: refs/heads/master 1b9ec19f9 -> 7d8d40dd4
[GOBBLIN-368][GOBBLIN-365] Add configuration property to allow application of filters to directory paths Closes #2243 from sv2000/gobblin_368 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7d8d40dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7d8d40dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7d8d40dd Branch: refs/heads/master Commit: 7d8d40dd4e905ed48720d0aeecf60f8974d4c410 Parents: 1b9ec19 Author: suvasude <[email protected]> Authored: Wed Jan 17 11:57:24 2018 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Wed Jan 17 11:57:24 2018 -0800 ---------------------------------------------------------------------- .../data/management/copy/CopyConfiguration.java | 9 ++-- .../copy/RecursiveCopyableDataset.java | 38 ++++++++++------- .../org/apache/gobblin/util/FileListUtils.java | 45 ++++++++++++++------ 3 files changed, 59 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java index 82cca49..211ad13 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java @@ -55,6 +55,7 @@ public class CopyConfiguration { * Include empty directories in the source for copy */ public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories"; + public static final String APPLY_FILTER_TO_DIRECTORIES = COPY_PREFIX + ".applyFilterToDirectories"; public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias"; public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy"; @@ -101,7 +102,7 @@ public class CopyConfiguration { this.targetGroup = properties.containsKey(DESTINATION_GROUP_KEY) ? Optional.of(properties.getProperty(DESTINATION_GROUP_KEY)) - : Optional.<String> absent(); + : Optional.<String>absent(); this.preserve = PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY)); Path publishDirTmp = new Path(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR)); if (!publishDirTmp.isAbsolute()) { @@ -113,9 +114,8 @@ public class CopyConfiguration { if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) { try { this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor( - new ClassAliasResolver(FileSetComparator.class).resolveClass(properties.getProperty( - PRIORITIZER_ALIAS_KEY)), - properties)); + new ClassAliasResolver(FileSetComparator.class).resolveClass( + properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties)); } catch (ReflectiveOperationException roe) { throw new RuntimeException("Could not build prioritizer.", roe); } @@ -138,5 +138,4 @@ public class CopyConfiguration { public Config getPrioritizationConfig() { return ConfigUtils.getConfigOrEmpty(this.config, PRIORITIZATION_PREFIX); } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 138debe..252dafa 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -72,6 +72,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData private final boolean includeEmptyDirectories; // Delete empty directories in the destination private final boolean deleteEmptyDirectories; + //Apply filter to directories + private final boolean applyFilterToDirectories; private final Properties properties; @@ -89,6 +91,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY)); this.includeEmptyDirectories = Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES)); + this.applyFilterToDirectories = + Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false")); this.properties = properties; } @@ -97,10 +101,13 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData throws IOException { Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob); - Path targetPath = new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath)); + Path targetPath = + new Path(configuration.getPublishDir(), PathUtils.relativizePath(this.rootPath, nonGlobSearchPath)); - Map<Path, FileStatus> filesInSource = createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath); - Map<Path, FileStatus> filesInTarget = createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath); + Map<Path, FileStatus> filesInSource = + createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter), this.rootPath); + Map<Path, FileStatus> filesInTarget = + createPathMap(getFilesAtPath(targetFs, targetPath, this.pathFilter), targetPath); List<Path> toCopy = Lists.newArrayList(); Map<Path, FileStatus> toDelete = Maps.newHashMap(); @@ -136,11 +143,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData FileStatus file = filesInSource.get(path); Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath); Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath); - CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration) - .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString()) - .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, - file.getPath().getParent(), nonGlobSearchPath, configuration)) - .build(); + CopyableFile copyableFile = + CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration).fileSet(datasetURN()) + .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile + .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, + configuration)).build(); /* * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder @@ -164,9 +171,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles)); if (!toDelete.isEmpty()) { - CommitStep step = - new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties, - this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent()); + CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), this.properties, + this.deleteEmptyDirectories ? Optional.of(targetPath) : Optional.<Path>absent()); copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, String>newHashMap(), step, 1)); } @@ -175,9 +181,11 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData } @VisibleForTesting - protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { + protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) + throws IOException { try { - return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories); + return FileListUtils + .listFilesToCopyAtPath(fs, path, fileFilter, applyFilterToDirectories, includeEmptyDirectories); } catch (FileNotFoundException fnfe) { return Lists.newArrayList(); } @@ -202,7 +210,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData } private static boolean sameFile(FileStatus fileInSource, FileStatus fileInTarget) { - return fileInTarget.getLen() == fileInSource.getLen() - && fileInSource.getModificationTime() <= fileInTarget.getModificationTime(); + return fileInTarget.getLen() == fileInSource.getLen() && fileInSource.getModificationTime() <= fileInTarget + .getModificationTime(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7d8d40dd/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java index 51bf66d..343da43 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java @@ -56,11 +56,13 @@ public class FileListUtils { } }; - public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path) throws IOException { + public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path) + throws IOException { return listFilesRecursively(fs, path, NO_OP_PATH_FILTER); } - public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths) throws IOException { + public static List<FileStatus> listFilesRecursively(FileSystem fs, Iterable<Path> paths) + throws IOException { List<FileStatus> results = Lists.newArrayList(); for (Path path : paths) { results.addAll(listFilesRecursively(fs, path)); @@ -77,12 +79,25 @@ public class FileListUtils { * @param includeEmptyDirectories a control to include empty directories for copy */ public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter, - boolean includeEmptyDirectories) - throws IOException { + boolean includeEmptyDirectories) throws IOException { + return listFilesToCopyAtPath(fs, path, fileFilter, false, includeEmptyDirectories); + } + + /** + * Given a path to copy, list all files rooted at the given path to copy + * + * @param fs the file system of the path + * @param path root path to copy + * @param fileFilter a filter only applied to root + * @param applyFilterToDirectories a control to decide whether to apply filter to directories + * @param includeEmptyDirectories a control to include empty directories for copy + */ + public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter, + boolean applyFilterToDirectories, boolean includeEmptyDirectories) throws IOException { List<FileStatus> files = Lists.newArrayList(); FileStatus rootFile = fs.getFileStatus(path); - listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories); + listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, applyFilterToDirectories, includeEmptyDirectories); // Copy the empty root directory if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) { @@ -105,7 +120,8 @@ public class FileListUtils { * {@link PathFilter} will only be applied to files. */ public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter, - boolean applyFilterToDirectories) throws IOException { + boolean applyFilterToDirectories) + throws IOException { return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter, applyFilterToDirectories, false); } @@ -114,8 +130,8 @@ public class FileListUtils { FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories) throws FileNotFoundException, IOException { if (fileStatus.isDirectory()) { - for (FileStatus status : fs.listStatus(fileStatus.getPath(), - applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) { + for (FileStatus status : fs + .listStatus(fileStatus.getPath(), applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) { if (status.isDirectory()) { // Number of files collected before diving into the directory int numFilesBefore = files.size(); @@ -144,11 +160,13 @@ public class FileListUtils { /** * Method to list out all files, or directory if no file exists, under a specified path. */ - public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path) throws IOException { + public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path) + throws IOException { return listMostNestedPathRecursively(fs, path, NO_OP_PATH_FILTER); } - public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths) throws IOException { + public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Iterable<Path> paths) + throws IOException { List<FileStatus> results = Lists.newArrayList(); for (Path path : paths) { results.addAll(listMostNestedPathRecursively(fs, path)); @@ -162,12 +180,13 @@ public class FileListUtils { */ public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { - return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), + return listMostNestedPathRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path), fileFilter); } private static List<FileStatus> listMostNestedPathRecursivelyHelper(FileSystem fs, List<FileStatus> files, - FileStatus fileStatus, PathFilter fileFilter) throws IOException { + FileStatus fileStatus, PathFilter fileFilter) + throws IOException { if (fileStatus.isDirectory()) { FileStatus[] curFileStatus = fs.listStatus(fileStatus.getPath()); if (ArrayUtils.isEmpty(curFileStatus)) { @@ -189,7 +208,7 @@ public class FileListUtils { */ public static List<FileStatus> listPathsRecursively(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { - return listPathsRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter); + return listPathsRecursivelyHelper(fs, Lists.<FileStatus>newArrayList(), fs.getFileStatus(path), fileFilter); } private static List<FileStatus> listPathsRecursivelyHelper(FileSystem fs, List<FileStatus> files,
