Repository: incubator-gobblin Updated Branches: refs/heads/master 3ab02b3f2 -> 42677dc8c
[GOBBLIN-183] Gobblin data management copy empty directories Closes #2036 from zxcware/empty Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/42677dc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/42677dc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/42677dc8 Branch: refs/heads/master Commit: 42677dc8ccd32262006f7d77c4d40c7735ed71e5 Parents: 3ab02b3 Author: zhchen <zhc...@linkedin.com> Authored: Wed Aug 9 11:04:03 2017 -0700 Committer: Issac Buenrostro <ibuen...@apache.org> Committed: Wed Aug 9 11:04:03 2017 -0700 ---------------------------------------------------------------------- .../data/management/copy/CopyConfiguration.java | 5 ++ .../data/management/copy/CopyableFile.java | 2 +- .../copy/RecursiveCopyableDataset.java | 9 +++- .../management/copy/RecursivePathFinder.java | 9 +++- .../FileAwareInputStreamExtractor.java | 4 ++ .../writer/FileAwareInputStreamDataWriter.java | 5 ++ .../org/apache/gobblin/util/FileListUtils.java | 50 +++++++++++++++++--- .../gobblin/util/io/EmptyInputStream.java | 36 ++++++++++++++ .../apache/gobblin/util/FileListUtilsTest.java | 47 ++++++++++++++++++ 9 files changed, 156 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java index b8f0365..82cca49 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java @@ -51,6 +51,11 @@ public class CopyConfiguration { public static final String DESTINATION_GROUP_KEY = COPY_PREFIX + ".dataset.destination.group"; public static final String PRIORITIZATION_PREFIX = COPY_PREFIX + ".prioritization"; + /** + * Include empty directories in the source for copy + */ + public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories"; + public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias"; public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index f2cb933..9c729e3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -203,7 +203,7 @@ public class CopyableFile extends CopyEntity implements File { this.configuration.getTargetFs(), this.destination); } if (this.checksum == null) { - FileChecksum checksumTmp = this.originFs.getFileChecksum(this.origin.getPath()); + FileChecksum checksumTmp = this.origin.isDirectory() ? null : this.originFs.getFileChecksum(this.origin.getPath()); this.checksum = checksumTmp == null ? new byte[0] : checksumTmp.getBytes(); } if (this.fileSet == null) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index e24752a..34428f1 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -65,7 +65,12 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData private final CopyableFileFilter copyableFileFilter; private final boolean update; private final boolean delete; + + // Include empty directories in the source for copy + private final boolean includeEmptyDirectories; + // Delete empty directories in the destination private final boolean deleteEmptyDirectories; + private final Properties properties; public RecursiveCopyableDataset(final FileSystem fs, Path rootPath, Properties properties, Path glob) { @@ -80,6 +85,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData this.update = Boolean.parseBoolean(properties.getProperty(UPDATE_KEY)); this.delete = Boolean.parseBoolean(properties.getProperty(DELETE_KEY)); this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY)); + this.includeEmptyDirectories = + Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES)); this.properties = properties; } @@ -150,7 +157,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData @VisibleForTesting protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { try { - return FileListUtils.listFilesRecursively(fs, path, fileFilter); + return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories); } catch (FileNotFoundException fnfe) { return Lists.newArrayList(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java index ec608b2..b749996 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java @@ -46,22 +46,27 @@ public class RecursivePathFinder { private final Path rootPath; private final FileSystem fs; private final PathFilter pathFilter; + private final boolean includeEmptyDirectories; public RecursivePathFinder(final FileSystem fs, Path rootPath, Properties properties) { this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath); this.fs = fs; this.pathFilter = DatasetUtils.instantiatePathFilter(properties); + this.includeEmptyDirectories = + Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES)); } - public Set<FileStatus> getPaths(boolean skipHiddenPaths) throws IOException { + public Set<FileStatus> getPaths(boolean skipHiddenPaths) + throws IOException { if (!this.fs.exists(this.rootPath)) { return Sets.newHashSet(); } PathFilter actualFilter = skipHiddenPaths ? new AndPathFilter(new HiddenFilter(), this.pathFilter) : this.pathFilter; - List<FileStatus> files = FileListUtils.listFilesRecursively(this.fs, this.rootPath, actualFilter); + List<FileStatus> files = + FileListUtils.listFilesToCopyAtPath(this.fs, this.rootPath, actualFilter, includeEmptyDirectories); return Sets.newHashSet(files); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java index 5e53344..406c828 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java @@ -23,6 +23,7 @@ import org.apache.gobblin.data.management.copy.FileAwareInputStream; import org.apache.gobblin.source.extractor.DataRecordException; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.io.EmptyInputStream; import org.apache.gobblin.util.io.MeteredInputStream; import java.io.IOException; @@ -80,6 +81,9 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state); FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf); this.recordRead = true; + if (this.file.getFileStatus().isDirectory()) { + return new FileAwareInputStream(this.file, EmptyInputStream.instance); + } return new FileAwareInputStream(this.file, MeteredInputStream.builder().in(fsFromFile.open(this.file.getFileStatus().getPath())).build()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java index 3ce204d..905ac02 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java @@ -206,6 +206,11 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA log.info(String.format("Recovering persisted file %s to %s.", persistedFile.get().getPath(), writeAt)); this.fs.rename(persistedFile.get().getPath(), writeAt); } else { + // Copy empty directories + if (copyableFile.getFileStatus().isDirectory()) { + this.fs.mkdirs(writeAt); + return; + } OutputStream os = this.fs.create(writeAt, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java index 9472db6..02920c2 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java @@ -69,12 +69,35 @@ public class FileListUtils { } /** + * Given a path to copy, list all files rooted at the given path to copy + * + * @param fs the file system of the path + * @param path root path to copy + * @param fileFilter a filter only applied to root + * @param includeEmptyDirectories a control to include empty directories for copy + */ + public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter, + boolean includeEmptyDirectories) + throws IOException { + List<FileStatus> files = Lists.newArrayList(); + FileStatus rootFile = fs.getFileStatus(path); + + listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories); + + // Copy the empty root directory + if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) { + files.add(rootFile); + } + return files; + } + + /** * Helper method to list out all files under a specified path. The specified {@link PathFilter} is treated as a file * filter, that is it is only applied to file {@link Path}s. */ public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { - return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter, false); + return listFilesRecursively(fs, path, fileFilter, false); } /** @@ -83,20 +106,33 @@ public class FileListUtils { */ public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter, boolean applyFilterToDirectories) throws IOException { - return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter, - applyFilterToDirectories); + return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter, + applyFilterToDirectories, false); } private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs, List<FileStatus> files, - FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories) + FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories) throws FileNotFoundException, IOException { if (fileStatus.isDirectory()) { for (FileStatus status : fs.listStatus(fileStatus.getPath(), applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) { - if (fileStatus.isDirectory()) { - listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories); + if (status.isDirectory()) { + // Number of files collected before diving into the directory + int numFilesBefore = files.size(); + + listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories, includeEmptyDirectories); + + // Number of files collected after diving into the directory + int numFilesAfter = files.size(); + if (numFilesAfter == numFilesBefore && includeEmptyDirectories) { + /* + * This is effectively an empty directory, which needs explicit copying. Has there any data file + * in the directory, the directory would be created as a side-effect of copying the data file + */ + files.add(status); + } } else { - files.add(fileStatus); + files.add(status); } } } else if (fileFilter.accept(fileStatus.getPath())) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java new file mode 100644 index 0000000..3516866 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.io; + +import java.io.IOException; +import java.io.InputStream; + + +/** + * An {@link InputStream} with empty content + */ +public class EmptyInputStream extends InputStream { + public static final InputStream instance = new EmptyInputStream(); + + private EmptyInputStream() {} + @Override + public int read() + throws IOException { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java index 415ae54..388e311 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java @@ -146,4 +146,51 @@ public class FileListUtilsTest { } } + public void testListFilesToCopyAtPath() throws IOException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path baseDir = new Path(FILE_UTILS_TEST_DIR, "fileListTestDir4"); + try { + if (localFs.exists(baseDir)) { + localFs.delete(baseDir, true); + } + localFs.mkdirs(baseDir); + + // Empty root directory + List<FileStatus> testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true); + Assert.assertEquals(testFiles.size(), 1); + Assert.assertEquals(testFiles.get(0).getPath().getName(), baseDir.getName()); + + // With an empty sub directory + Path subDir = new Path(baseDir, "subDir"); + localFs.mkdirs(subDir); + testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true); + Assert.assertEquals(testFiles.size(), 1); + Assert.assertEquals(testFiles.get(0).getPath().getName(), subDir.getName()); + + // Disable include empty directories + testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, false); + Assert.assertEquals(testFiles.size(), 0); + + // With file subDir/tes1 + Path test1Path = new Path(subDir, TEST_FILE_NAME1); + localFs.create(test1Path); + testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true); + Assert.assertEquals(testFiles.size(), 1); + Assert.assertEquals(testFiles.get(0).getPath().getName(), test1Path.getName()); + + // With file subDir/test2 + Path test2Path = new Path(subDir, TEST_FILE_NAME2); + localFs.create(test2Path); + testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true); + Assert.assertEquals(testFiles.size(), 2); + Set<String> fileNames = Sets.newHashSet(); + for (FileStatus testFileStatus : testFiles) { + fileNames.add(testFileStatus.getPath().getName()); + } + Assert.assertTrue(fileNames.contains(TEST_FILE_NAME1) && fileNames.contains(TEST_FILE_NAME2)); + } finally { + localFs.delete(baseDir, true); + } + } + }