Repository: flume Updated Branches: refs/heads/trunk 1b9e58915 -> 7d1e683fb
FLUME-2918. Speed up TaildirSource on directories with many files This patch greatly improves the performance of TaildirSource on directories that contain a large number of files. (Attila Simon via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7d1e683f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7d1e683f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7d1e683f Branch: refs/heads/trunk Commit: 7d1e683fbd7d261fff9fcf17ad78fd8469c64905 Parents: 1b9e589 Author: Mike Percy <[email protected]> Authored: Mon Jun 20 01:09:07 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Mon Jun 20 01:13:56 2016 -0700 ---------------------------------------------------------------------- .../taildir/ReliableTaildirEventReader.java | 56 ++-- .../apache/flume/source/taildir/TailFile.java | 7 - .../flume/source/taildir/TaildirMatcher.java | 278 +++++++++++++++++++ .../flume/source/taildir/TaildirSource.java | 3 + .../TaildirSourceConfigurationConstants.java | 4 + .../source/taildir/TestTaildirMatcher.java | 227 +++++++++++++++ .../flume/source/taildir/TestTaildirSource.java | 2 +- 7 files changed, 531 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 5b6d465..8128df4 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -20,18 +20,14 @@ package org.apache.flume.source.taildir; import java.io.File; -import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.nio.file.Files; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.regex.Pattern; import org.apache.flume.Event; import org.apache.flume.FlumeException; @@ -43,11 +39,9 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Table; -import com.google.common.collect.Table.Cell; import com.google.gson.stream.JsonReader; @InterfaceAudience.Private @@ -55,13 +49,14 @@ import com.google.gson.stream.JsonReader; public class ReliableTaildirEventReader implements ReliableEventReader { private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class); - private final Table<String, File, Pattern> tailFileTable; + private final List<TaildirMatcher> taildirCache; private final Table<String, String, String> headerTable; private TailFile currentFile = null; private Map<Long, TailFile> tailFiles = Maps.newHashMap(); private long updateTime; private boolean addByteOffset; + private boolean cachePatternMatching; private boolean committed = true; /** @@ -69,7 +64,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { */ private ReliableTaildirEventReader(Map<String, String> filePaths, Table<String, String, String> headerTable, String positionFilePath, - boolean skipToEnd, boolean addByteOffset) throws IOException { + boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException { // Sanity checks Preconditions.checkNotNull(filePaths); Preconditions.checkNotNull(positionFilePath); @@ -79,21 +74,17 @@ public class ReliableTaildirEventReader implements ReliableEventReader { new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths }); } - Table<String, File, Pattern> tailFileTable = HashBasedTable.create(); + List<TaildirMatcher> taildirCache = Lists.newArrayList(); for (Entry<String, String> e : filePaths.entrySet()) { - File f = new File(e.getValue()); - File parentDir = f.getParentFile(); - Preconditions.checkState(parentDir.exists(), - "Directory does not exist: " + parentDir.getAbsolutePath()); - Pattern fileNamePattern = Pattern.compile(f.getName()); - tailFileTable.put(e.getKey(), parentDir, fileNamePattern); + taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching)); } - logger.info("tailFileTable: " + tailFileTable.toString()); + logger.info("taildirCache: " + taildirCache.toString()); logger.info("headerTable: " + headerTable.toString()); - this.tailFileTable = tailFileTable; + this.taildirCache = taildirCache; this.headerTable = headerTable; this.addByteOffset = addByteOffset; + this.cachePatternMatching = cachePatternMatching; updateTailFiles(skipToEnd); logger.info("Updating position from position file: " + positionFilePath); @@ -238,12 +229,10 @@ public class ReliableTaildirEventReader implements ReliableEventReader { updateTime = System.currentTimeMillis(); List<Long> updatedInodes = Lists.newArrayList(); - for (Cell<String, File, Pattern> cell : tailFileTable.cellSet()) { - Map<String, String> headers = headerTable.row(cell.getRowKey()); - File parentDir = cell.getColumnKey(); - Pattern fileNamePattern = cell.getValue(); + for (TaildirMatcher taildir : taildirCache) { + Map<String, String> headers = headerTable.row(taildir.getFileGroup()); - for (File f : getMatchFiles(parentDir, fileNamePattern)) { + for (File f : taildir.getMatchingFiles()) { long inode = getInode(f); TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { @@ -274,21 +263,6 @@ public class ReliableTaildirEventReader implements ReliableEventReader { return updateTailFiles(false); } - private List<File> getMatchFiles(File parentDir, final Pattern fileNamePattern) { - FileFilter filter = new FileFilter() { - public boolean accept(File f) { - String fileName = f.getName(); - if (f.isDirectory() || !fileNamePattern.matcher(fileName).matches()) { - return false; - } - return true; - } - }; - File[] files = parentDir.listFiles(filter); - ArrayList<File> result = Lists.newArrayList(files); - Collections.sort(result, new TailFile.CompareByLastModifiedTime()); - return result; - } private long getInode(File file) throws IOException { long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); @@ -313,6 +287,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private String positionFilePath; private boolean skipToEnd; private boolean addByteOffset; + private boolean cachePatternMatching; public Builder filePaths(Map<String, String> filePaths) { this.filePaths = filePaths; @@ -339,8 +314,13 @@ public class ReliableTaildirEventReader implements ReliableEventReader { return this; } + public Builder cachePatternMatching(boolean cachePatternMatching) { + this.cachePatternMatching = cachePatternMatching; + return this; + } + public ReliableTaildirEventReader build() throws IOException { - return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset); + return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching); } } http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java index eabd357..cb36e41 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -212,13 +212,6 @@ public class TailFile { } } - public static class CompareByLastModifiedTime implements Comparator<File> { - @Override - public int compare(File f1, File f2) { - return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified()); - } - } - private class LineResult { final boolean lineSepInclude; final byte[] line; http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java new file mode 100644 index 0000000..245aef5 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source.taildir; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +/** + * Identifies and caches the files matched by single file pattern for <code>TAILDIR<code/> source. + * <p></p> + * Since file patterns only apply to the fileNames and not the parent dictionaries, this implementation + * checks the parent directory for modification (additional or removed files update modification time of parent dir) + * If no modification happened to the parent dir that means the underlying files could only be written to but no need + * to rerun the pattern matching on fileNames. + * <p></p> + * This implementation provides lazy caching or no caching. Instances of this class keep the result + * file list from the last successful execution + * of {@linkplain #getMatchingFiles()} function invocation, + * and may serve the content without hitting the FileSystem for performance optimization. + * <p></p> + * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least second granularity for both + * <code>System.currentTimeMillis()</code> and <code>File.lastModified()</code>. Also that system clock is used + * for file system timestamps. If it is not the case then configure it as uncached. + * Class is solely for package only usage. Member functions are not thread safe. + * + * @see TaildirSource + * @see ReliableTaildirEventReader + * @see TaildirSourceConfigurationConstants + */ [email protected] [email protected] +public class TaildirMatcher { + private static final Logger logger = LoggerFactory.getLogger(TaildirMatcher.class); + private static final FileSystem FS = FileSystems.getDefault(); + + // flag from configuration to switch off caching completely + private final boolean cachePatternMatching; + // id from configuration + private final String fileGroup; + // plain string of the desired files from configuration + private final String filePattern; + + // directory monitored for changes + private final File parentDir; + // cached instance for filtering files based on filePattern + private final DirectoryStream.Filter<Path> fileFilter; + + // system time in milliseconds, stores the last modification time of the + // parent directory seen by the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly (system time is positive) + private long lastSeenParentDirMTime = -1; + // system time in milliseconds, time of the last check, rounded to seconds + // initial value is used in first check only when it will be replaced instantly (system time is positive) + private long lastCheckedTime = -1; + // cached content, files which matched the pattern within the parent directory + private List<File> lastMatchedFiles = Lists.newArrayList(); + + /** + * Package accessible constructor. From configuration context it represents a single <code>filegroup</code> + * and encapsulates the corresponding <code>filePattern</code>. + * <code>filePattern</code> consists of two parts: first part has to be a valid path to + * an existing parent directory, second part has to be a + * valid regex {@link java.util.regex.Pattern} that match any non-hidden file names within parent directory. + * A valid example for filePattern is <code>/dir0/dir1/.*</code> given <code>/dir0/dir1</code> + * is an existing directory structure readable by the running user. + * <p></p> + * An instance of this class is created for each fileGroup + * + * @param fileGroup arbitrary name of the group given by the config + * @param filePattern parent directory plus regex pattern. No wildcards are allowed in directory name + * @param cachePatternMatching default true, recommended in every setup especially with huge parent directories. + * Don't set when local system clock is not used for stamping mtime (eg: remote filesystems) + * @see TaildirSourceConfigurationConstants + */ + TaildirMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) { + // store whatever came from configuration + this.fileGroup = fileGroup; + this.filePattern = filePattern; + this.cachePatternMatching = cachePatternMatching; + + // calculate final members + File f = new File(filePattern); + this.parentDir = f.getParentFile(); + String regex = f.getName(); + final PathMatcher matcher = FS.getPathMatcher("regex:" + regex); + this.fileFilter = new DirectoryStream.Filter<Path>() { + @Override + public boolean accept(Path entry) throws IOException { + return matcher.matches(entry.getFileName()) && !Files.isDirectory(entry); + } + }; + + // sanity check + Preconditions.checkState(parentDir.exists(), + "Directory does not exist: " + parentDir.getAbsolutePath()); + } + + /** + * Lists those files within the parentDir that match regex pattern passed in during object instantiation. + * Designed for frequent periodic invocation {@link org.apache.flume.source.PollableSourceRunner}. + * <p></p> + * Based on the modification of the parentDir this function may trigger cache recalculation by calling + * {@linkplain #getMatchingFilesNoCache()} or + * return the value stored in {@linkplain #lastMatchedFiles}. + * Parentdir is allowed to be a symbolic link. + * <p></p> + * Files returned by this call are weakly consistent (see {@link DirectoryStream}). + * It does not freeze the directory while iterating, + * so it may (or may not) reflect updates to the directory that occur during the call, + * In which case next call + * will return those files (as mtime is increasing it won't hit cache but trigger recalculation). + * It is guaranteed that invocation reflects every change which was observable at the time of invocation. + * <p></p> + * Matching file list recalculation is triggered when caching was turned off or + * if mtime is greater than the previously seen mtime + * (including the case of cache hasn't been calculated before). + * Additionally if a constantly updated directory was configured as parentDir + * then multiple changes to the parentDir may happen + * within the same second so in such case (assuming at least second granularity of reported mtime) + * it is impossible to tell whether a change of the dir happened before the check or after + * (unless the check happened after that second). + * Having said that implementation also stores system time of the previous invocation and previous invocation has to + * happen strictly after the current mtime to avoid further cache refresh + * (because then it is guaranteed that previous invocation resulted in valid cache content). + * If system clock hasn't passed the second of + * the current mtime then logic expects more changes as well + * (since it cannot be sure that there won't be any further changes still in that second + * and it would like to avoid data loss in first place) + * hence it recalculates matching files. If system clock finally + * passed actual mtime then a subsequent invocation guarantees that it picked up every + * change from the passed second so + * any further invocations can be served from cache associated with that second (given mtime is not updated again). + * + * @return List of files matching the pattern sorted by last modification time. No recursion. No directories. + * If nothing matches then returns an empty list. If I/O issue occurred then returns the list collected to the point + * when exception was thrown. + * + * @see #getMatchingFilesNoCache() + */ + List<File> getMatchingFiles() { + long now = TimeUnit.SECONDS.toMillis(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); + long currentParentDirMTime = parentDir.lastModified(); + List<File> result; + + // calculate matched files if + // - we don't want to use cache (recalculate every time) OR + // - directory was clearly updated after the last check OR + // - last mtime change wasn't already checked for sure (system clock hasn't passed that second yet) + if (!cachePatternMatching || + lastSeenParentDirMTime < currentParentDirMTime || + !(currentParentDirMTime < lastCheckedTime)) { + lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache()); + lastSeenParentDirMTime = currentParentDirMTime; + lastCheckedTime = now; + } + + return lastMatchedFiles; + } + + /** + * Provides the actual files within the parentDir which + * files are matching the regex pattern. Each invocation uses {@link DirectoryStream} + * to identify matching files. + * + * Files returned by this call are weakly consistent (see {@link DirectoryStream}). It does not freeze the directory while iterating, + * so it may (or may not) reflect updates to the directory that occur during the call. In which case next call + * will return those files. + * + * @return List of files matching the pattern unsorted. No recursion. No directories. + * If nothing matches then returns an empty list. If I/O issue occurred then returns the list collected to the point + * when exception was thrown. + * + * @see DirectoryStream + * @see DirectoryStream.Filter + */ + private List<File> getMatchingFilesNoCache() { + List<File> result = Lists.newArrayList(); + try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) { + for (Path entry : stream) { + result.add(entry.toFile()); + } + } catch (IOException e) { + logger.error("I/O exception occurred while listing parent directory. Files already matched will be returned. " + + parentDir.toPath(), e); + } + return result; + } + + /** + * Utility function to sort matched files based on last modification time. + * Sorting itself use only a snapshot of last modification times captured before the sorting to keep the + * number of stat system calls to the required minimum. + * + * @param files list of files in any order + * @return sorted list + */ + private static List<File> sortByLastModifiedTime(List<File> files) { + final HashMap<File, Long> lastModificationTimes = new HashMap<File, Long>(files.size()); + for (File f: files) { + lastModificationTimes.put(f, f.lastModified()); + } + Collections.sort(files, new Comparator<File>() { + @Override + public int compare(File o1, File o2) { + return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2)); + } + }); + + return files; + } + + @Override + public String toString() { + return "{" + + "filegroup='" + fileGroup + '\'' + + ", filePattern='" + filePattern + '\'' + + ", cached=" + cachePatternMatching + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaildirMatcher that = (TaildirMatcher) o; + + return fileGroup.equals(that.fileGroup); + + } + + @Override + public int hashCode() { + return fileGroup.hashCode(); + } + + public String getFileGroup() { + return fileGroup; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 8816327..dfb5b29 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -78,6 +78,7 @@ public class TaildirSource extends AbstractSource implements private int checkIdleInterval = 5000; private int writePosInitDelay = 5000; private int writePosInterval; + private boolean cachePatternMatching; private List<Long> existingInodes = new CopyOnWriteArrayList<Long>(); private List<Long> idleInodes = new CopyOnWriteArrayList<Long>(); @@ -94,6 +95,7 @@ public class TaildirSource extends AbstractSource implements .positionFilePath(positionFilePath) .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) + .cachePatternMatching(cachePatternMatching) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); @@ -166,6 +168,7 @@ public class TaildirSource extends AbstractSource implements byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER); idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT); writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL); + cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, DEFAULT_CACHE_PATTERN_MATCHING); backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index 6165276..b0c934d 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -49,4 +49,8 @@ public class TaildirSourceConfigurationConstants { public static final String BYTE_OFFSET_HEADER = "byteOffsetHeader"; public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset"; public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false; + + /** Whether to cache the list of files matching the specified file patterns till parent directory is modified. */ + public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching"; + public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true; } http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java new file mode 100644 index 0000000..4bff841 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source.taildir; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class TestTaildirMatcher { + private File tmpDir; + private Map<String, File> files; + private boolean isCachingNeeded = true; + + final String msgAlreadyExistingFile = "a file was not found but it was created before matcher"; + final String msgAfterNewFileCreated = "files which were created after last check are not found"; + final String msgAfterAppend = "a file was not found although it was just appended within the dir"; + final String msgEmptyDir = "empty dir should return an empty list"; + final String msgNoMatch = "no match should return an empty list"; + final String msgSubDirs = "only files on the same level as the pattern should be returned"; + final String msgNoChange = "file wasn't touched after last check cannot be found"; + final String msgAfterDelete = "file was returned even after it was deleted"; + + /** + * Append a line to the specified file within tmpDir. + * If file doesn't exist it will be created. + */ + private void append(String fileName) throws IOException { + File f; + if(!files.containsKey(fileName)){ + f = new File(tmpDir, fileName); + files.put(fileName, f); + }else{ + f = files.get(fileName); + } + Files.append(fileName + "line\n", f, Charsets.UTF_8); + } + + /** + * Translate a list of files to list of filename strings. + */ + private static List<String> filesToNames(List<File> origList){ + Function<File, String> file2nameFn = new Function<File, String>() { + @Override + public String apply(File input) { + return input.getName(); + } + }; + return Lists.transform(origList, file2nameFn); + } + + @Before + public void setUp() throws Exception { + files = Maps.newHashMap(); + tmpDir = Files.createTempDir(); + } + + @After + public void tearDown() throws Exception { + for (File f : tmpDir.listFiles()) { + if (f.isDirectory()) { + for (File sdf : f.listFiles()) { + sdf.delete(); + } + } + f.delete(); + } + tmpDir.delete(); + files = null; + } + + @Test + public void getMatchingFiles() throws Exception { + append("file0"); + append("file1"); + + TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + "file.*", isCachingNeeded); + List<String> files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAlreadyExistingFile, 2, files.size()); + assertTrue(msgAlreadyExistingFile, files.contains("file1")); + + append("file1"); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterNewFileCreated, 2, files.size()); + assertTrue(msgAfterNewFileCreated, files.contains("file0")); + assertTrue(msgAfterNewFileCreated, files.contains("file1")); + + append("file2"); + append("file3"); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterAppend, 4, files.size()); + assertTrue(msgAfterAppend, files.contains("file0")); + assertTrue(msgAfterAppend, files.contains("file1")); + assertTrue(msgAfterAppend, files.contains("file2")); + assertTrue(msgAfterAppend, files.contains("file3")); + + this.files.get("file0").delete(); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterDelete, 3, files.size()); + assertFalse(msgAfterDelete, files.contains("file0")); + assertTrue(msgNoChange, files.contains("file1")); + assertTrue(msgNoChange, files.contains("file2")); + assertTrue(msgNoChange, files.contains("file3")); + } + + @Test + public void getMatchingFilesNoCache() throws Exception { + append("file0"); + append("file1"); + + TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + "file.*", false); + List<String> files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAlreadyExistingFile, 2, files.size()); + assertTrue(msgAlreadyExistingFile, files.contains("file1")); + + append("file1"); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterAppend, 2, files.size()); + assertTrue(msgAfterAppend, files.contains("file0")); + assertTrue(msgAfterAppend, files.contains("file1")); + + append("file2"); + append("file3"); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterNewFileCreated, 4, files.size()); + assertTrue(msgAfterNewFileCreated, files.contains("file0")); + assertTrue(msgAfterNewFileCreated, files.contains("file1")); + assertTrue(msgAfterNewFileCreated, files.contains("file2")); + assertTrue(msgAfterNewFileCreated, files.contains("file3")); + + this.files.get("file0").delete(); + files = filesToNames(tm.getMatchingFiles()); + assertEquals(msgAfterDelete, 3, files.size()); + assertFalse(msgAfterDelete, files.contains("file0")); + assertTrue(msgNoChange, files.contains("file1")); + assertTrue(msgNoChange, files.contains("file2")); + assertTrue(msgNoChange, files.contains("file3")); + } + + @Test + public void testEmtpyDirMatching() throws Exception { + TaildirMatcher tm = new TaildirMatcher("empty", tmpDir.getAbsolutePath() + File.separator + ".*", isCachingNeeded); + List<File> files = tm.getMatchingFiles(); + assertNotNull(msgEmptyDir, files); + assertTrue(msgEmptyDir, files.isEmpty()); + } + + @Test + public void testNoMatching() throws Exception { + TaildirMatcher tm = new TaildirMatcher("nomatch", tmpDir.getAbsolutePath() + File.separator + "abracadabra_nonexisting", isCachingNeeded); + List<File> files = tm.getMatchingFiles(); + assertNotNull(msgNoMatch, files); + assertTrue(msgNoMatch, files.isEmpty()); + } + + @Test(expected = IllegalStateException.class) + public void testNonExistingDir() { + TaildirMatcher tm = new TaildirMatcher("exception", "/abracadabra/doesntexist/.*", isCachingNeeded); + } + + @Test + public void testDirectoriesAreNotListed() throws Exception { + new File(tmpDir, "outerFile").createNewFile(); + new File(tmpDir, "recursiveDir").mkdir(); + new File(tmpDir + File.separator + "recursiveDir", "innerFile").createNewFile(); + TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + ".*", isCachingNeeded); + List<String> files = filesToNames(tm.getMatchingFiles()); + + assertEquals(msgSubDirs, 1, files.size()); + assertTrue(msgSubDirs, files.contains("outerFile")); + } + + @Test + public void testRegexFileNameFiltering() throws IOException { + append("a.log"); + append("a.log.1"); + append("b.log"); + append("c.log.yyyy.MM-01"); + append("c.log.yyyy.MM-02"); + + // Tail a.log and b.log + TaildirMatcher tm1 = new TaildirMatcher("ab", tmpDir.getAbsolutePath() + File.separator + "[ab].log", isCachingNeeded); + // Tail files that starts with c.log + TaildirMatcher tm2 = new TaildirMatcher("c", tmpDir.getAbsolutePath() + File.separator + "c.log.*", isCachingNeeded); + + List<String> files1 = filesToNames(tm1.getMatchingFiles()); + List<String> files2 = filesToNames(tm2.getMatchingFiles()); + + assertEquals(2, files1.size()); + assertEquals(2, files2.size()); + // Make sure we got every file + assertTrue("Regex pattern for ab should have matched a.log file", files1.contains("a.log")); + assertFalse("Regex pattern for ab should NOT have matched a.log.1 file", files1.contains("a.log.1")); + assertTrue("Regex pattern for ab should have matched b.log file", files1.contains("b.log")); + assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 file", files2.contains("c.log.yyyy.MM-01")); + assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 file", files2.contains("c.log.yyyy.MM-02")); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/7d1e683f/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index f9e614c..f6289cd 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -77,7 +77,7 @@ public class TestTaildirSource { } @Test - public void testRegexFileNameFiltering() throws IOException { + public void testRegexFileNameFilteringEndToEnd() throws IOException { File f1 = new File(tmpDir, "a.log"); File f2 = new File(tmpDir, "a.log.1"); File f3 = new File(tmpDir, "b.log");
