Repository: flume Updated Branches: refs/heads/trunk cfbf11568 -> 7013708ba
FLUME-2939. Update recursive SpoolDir source to use Java 7 APIs (Bessenyei Balázs Donát via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7013708b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7013708b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7013708b Branch: refs/heads/trunk Commit: 7013708baddc8ed7d861797d1fd8280a94b6025c Parents: cfbf115 Author: Mike Percy <[email protected]> Authored: Fri Jul 8 17:32:09 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jul 8 17:32:09 2016 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 95 ++++++++++---------- 1 file changed, 49 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7013708b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index 4dc0207..ca5308c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.io.Files; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; @@ -43,10 +42,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.Charset; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -160,8 +163,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { try { File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary", spoolDirectory); - Files.write("testing flume file permissions\n", canary, Charsets.UTF_8); - List<String> lines = Files.readLines(canary, Charsets.UTF_8); + Files.write(canary.toPath(), "testing flume file permissions\n".getBytes()); + List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8); Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary); if (!canary.delete()) { throw new IOException("Unable to delete canary file " + canary); @@ -216,49 +219,46 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } /** - * Filter to exclude files/directories either hidden, finished, or names matching the ignore pattern - */ - final FileFilter filter = new FileFilter() { - public boolean accept(File candidate) { - if (candidate.isDirectory()) { - String directoryName = candidate.getName(); - if (!recursiveDirectorySearch || - directoryName.startsWith(".") || - ignorePattern.matcher(directoryName).matches()) { - - return false; - } - return true; - } - String fileName = candidate.getName(); - if (fileName.endsWith(completedSuffix) || - fileName.startsWith(".") || - ignorePattern.matcher(fileName).matches()) { - return false; - } - - return true; - } - }; - - /** * Recursively gather candidate files * @param directory the directory to gather files from * @return list of files within the passed in directory */ - private List<File> getCandidateFiles(File directory) { + private List<File> getCandidateFiles(final Path directory) { Preconditions.checkNotNull(directory); - List<File> candidateFiles = new ArrayList<File>(); - if (!directory.isDirectory()) { - return candidateFiles; - } + final List<File> candidateFiles = new ArrayList<>(); + try { + Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + if (directory.equals(dir)) { // The top directory should always be listed + return FileVisitResult.CONTINUE; + } + String directoryName = dir.getFileName().toString(); + if (!recursiveDirectorySearch || + directoryName.startsWith(".") || + ignorePattern.matcher(directoryName).matches()) { + return FileVisitResult.SKIP_SUBTREE; + } + return FileVisitResult.CONTINUE; + } - for (File file : directory.listFiles(filter)) { - if (file.isDirectory()) { - candidateFiles.addAll(getCandidateFiles(file)); - } else { - candidateFiles.add(file); - } + @Override + public FileVisitResult visitFile(Path candidate, BasicFileAttributes attrs) + throws IOException { + String fileName = candidate.getFileName().toString(); + if (!fileName.endsWith(completedSuffix) && + !fileName.startsWith(".") && + !ignorePattern.matcher(fileName).matches()) { + candidateFiles.add(candidate.toFile()); + } + + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { + logger.error("I/O exception occurred while listing directories. " + + "Files already matched will be returned. " + directory, e); } return candidateFiles; @@ -315,7 +315,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { * If so, try to roll to the next file, if there is one. * Loop until events is not empty or there is no next file in case of 0 byte files */ while (events.isEmpty()) { - logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one."); + logger.info("Last read took us just up to a file boundary. " + + "Rolling to the next file, if there is one."); retireCurrentFile(); currentFile = getNextFile(); if (!currentFile.isPresent()) { @@ -417,7 +418,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { * file was already rolled but the rename was not atomic. If that seems * likely, we let it pass with only a warning. */ - if (Files.equal(currentFile.get().getFile(), dest)) { + if (Files.isSameFile(currentFile.get().getFile().toPath(), dest.toPath())) { logger.warn("Completed file " + dest + " already exists, but files match, so continuing."); boolean deleted = fileToRoll.delete(); @@ -494,7 +495,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { if (consumeOrder != ConsumeOrder.RANDOM || candidateFileIter == null || !candidateFileIter.hasNext()) { - candidateFiles = getCandidateFiles(spoolDirectory); + candidateFiles = getCandidateFiles(spoolDirectory.toPath()); listFilesCount++; candidateFileIter = candidateFiles.iterator(); } @@ -540,7 +541,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Opens a file for consuming * @param file - * @return {@link #FileInfo} for the file to consume or absent option if the + * @return {@link FileInfo} for the file to consume or absent option if the * file does not exists or readable. */ private Optional<FileInfo> openFile(File file) { @@ -584,7 +585,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } - /** An immutable class with information about a file being processed. */ + /** + * An immutable class with information about a file being processed. + */ private static class FileInfo { private final File file; private final long length;
