Updated Branches: refs/heads/trunk b447570ed -> 750809c70
FLUME-1732: SpoolableDirectorySource should have configurable support for deleting files it has already completed instead of renaming (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/750809c7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/750809c7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/750809c7 Branch: refs/heads/trunk Commit: 750809c70aa92225ed8df2ded0364d30a220405b Parents: b447570 Author: Brock Noland <[email protected]> Authored: Tue Jan 15 17:24:17 2013 -0800 Committer: Brock Noland <[email protected]> Committed: Tue Jan 15 17:24:17 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/client/avro/AvroCLIClient.java | 8 +- .../avro/ReliableSpoolingFileEventReader.java | 195 ++++++++++++--- .../apache/flume/source/SpoolDirectorySource.java | 31 ++-- ...SpoolDirectorySourceConfigurationConstants.java | 5 +- .../avro/TestReliableSpoolingFileEventReader.java | 179 +++++++++++++ .../client/avro/TestSpoolingFileLineReader.java | 12 +- .../TestReliableSpoolingFileEventReader.java | 157 ------------ 7 files changed, 373 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java index da23a75..3c8c267 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java @@ -36,7 +36,6 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; @@ -204,11 +203,8 @@ public class AvroCLIClient { if (fileName != null) { reader = new SimpleTextLineEventReader(new FileReader(new File(fileName))); } else if (dirName != null) { - reader = new ReliableSpoolingFileEventReader( - new File(dirName), ".COMPLETED", - "^$", new File(new File(dirName), ".flumespool"), - false, "", - "LINE", new Context()); + reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(new File(dirName)).build(); } else { reader = new SimpleTextLineEventReader(new InputStreamReader(System.in)); } http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index f2d587f..b19d0ea 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -32,6 +32,7 @@ import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.serialization.*; +import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; import org.apache.flume.tools.PlatformDetect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { static final String metaFileName = ".flumespool-main.meta"; - private final File directory; + private final File spoolDirectory; private final String completedSuffix; private final String deserializerType; private final Context deserializerContext; @@ -82,6 +83,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private final File metaFile; private final boolean annotateFileName; private final String fileNameHeader; + private final String deletePolicy; private Optional<FileInfo> currentFile = Optional.absent(); /** Always contains the last file from which lines have been read. **/ @@ -91,35 +93,44 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Create a ReliableSpoolingFileEventReader to watch the given directory. */ - public ReliableSpoolingFileEventReader(File directory, String completedSuffix, - String ignorePattern, File trackerDirectory, + private ReliableSpoolingFileEventReader(File spoolDirectory, + String completedSuffix, String ignorePattern, String trackerDirPath, boolean annotateFileName, String fileNameHeader, - String deserializerType, Context deserializerContext) throws IOException { + String deserializerType, Context deserializerContext, + String deletePolicy) throws IOException { // Sanity checks - Preconditions.checkNotNull(directory); + Preconditions.checkNotNull(spoolDirectory); Preconditions.checkNotNull(completedSuffix); Preconditions.checkNotNull(ignorePattern); - Preconditions.checkNotNull(trackerDirectory); + Preconditions.checkNotNull(trackerDirPath); Preconditions.checkNotNull(deserializerType); Preconditions.checkNotNull(deserializerContext); + Preconditions.checkNotNull(deletePolicy); + + // validate delete policy + if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) && + !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) { + throw new IllegalArgumentException("Delete policies other than " + + "NEVER and IMMEDIATE are not yet supported"); + } if (logger.isDebugEnabled()) { logger.debug("Initializing {} with directory={}, metaDir={}, " + "deserializer={}", new Object[] { ReliableSpoolingFileEventReader.class.getSimpleName(), - directory, trackerDirectory, deserializerType }); + spoolDirectory, trackerDirPath, deserializerType }); } // Verify directory exists and is readable/writable - Preconditions.checkState(directory.exists(), - "Directory does not exist: " + directory.getAbsolutePath()); - Preconditions.checkState(directory.isDirectory(), - "Path is not a directory: " + directory.getAbsolutePath()); + Preconditions.checkState(spoolDirectory.exists(), + "Directory does not exist: " + spoolDirectory.getAbsolutePath()); + Preconditions.checkState(spoolDirectory.isDirectory(), + "Path is not a directory: " + spoolDirectory.getAbsolutePath()); // Do a canary test to make sure we have access to spooling directory try { - File f1 = File.createTempFile("flume", "test", directory); + File f1 = File.createTempFile("flume", "test", spoolDirectory); Files.write("testing flume file permissions\n", f1, Charsets.UTF_8); Files.readLines(f1, Charsets.UTF_8); if (!f1.delete()) { @@ -127,15 +138,24 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } catch (IOException e) { throw new FlumeException("Unable to read and modify files" + - " in the spooling directory: " + directory, e); + " in the spooling directory: " + spoolDirectory, e); } - this.directory = directory; + + this.spoolDirectory = spoolDirectory; this.completedSuffix = completedSuffix; this.deserializerType = deserializerType; this.deserializerContext = deserializerContext; this.annotateFileName = annotateFileName; this.fileNameHeader = fileNameHeader; this.ignorePattern = Pattern.compile(ignorePattern); + this.deletePolicy = deletePolicy; + + File trackerDirectory = new File(trackerDirPath); + + // if relative path, treat as relative to spool directory + if (!trackerDirectory.isAbsolute()) { + trackerDirectory = new File(spoolDirectory, trackerDirPath); + } // ensure that meta directory exists if (!trackerDirectory.exists()) { @@ -248,27 +268,43 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private void retireCurrentFile() throws IOException { Preconditions.checkState(currentFile.isPresent()); - String currPath = currentFile.get().getFile().getAbsolutePath(); - String newPath = currPath + completedSuffix; - logger.info("Preparing to move file {} to {}", currPath, newPath); + File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath()); currentFile.get().getDeserializer().close(); - File fileToRoll = new File(currPath); // Verify that spooling assumptions hold if (fileToRoll.lastModified() != currentFile.get().getLastModified()) { - String message = "File has been modified since being read: " + currPath; + String message = "File has been modified since being read: " + fileToRoll; throw new IllegalStateException(message); } if (fileToRoll.length() != currentFile.get().getLength()) { - String message = "File has changed size since being read: " + currPath; + String message = "File has changed size since being read: " + fileToRoll; throw new IllegalStateException(message); } - File destination = new File(newPath); + if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) { + rollCurrentFile(fileToRoll); + } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) { + deleteCurrentFile(fileToRoll); + } else { + // TODO: implement delay in the future + throw new IllegalArgumentException("Unsupported delete policy: " + + deletePolicy); + } + } + + /** + * Rename the given spooled file + * @param fileToRoll + * @throws IOException + */ + private void rollCurrentFile(File fileToRoll) throws IOException { + + File dest = new File(fileToRoll.getPath() + completedSuffix); + logger.info("Preparing to move file {} to {}", fileToRoll, dest); // Before renaming, check whether destination file name exists - if (destination.exists() && PlatformDetect.isWindows()) { + if (dest.exists() && PlatformDetect.isWindows()) { /* * If we are here, it means the completed file already exists. In almost * every case this means the user is violating an assumption of Flume @@ -277,8 +313,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { * file was already rolled but the rename was not atomic. If that seems * likely, we let it pass with only a warning. */ - if (Files.equal(currentFile.get().getFile(), destination)) { - logger.warn("Completed file " + newPath + + if (Files.equal(currentFile.get().getFile(), dest)) { + logger.warn("Completed file " + dest + " already exists, but files match, so continuing."); boolean deleted = fileToRoll.delete(); if (!deleted) { @@ -287,21 +323,21 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } else { String message = "File name has been re-used with different" + - " files. Spooling assumptions violated for " + newPath; + " files. Spooling assumptions violated for " + dest; throw new IllegalStateException(message); } // Dest file exists and not on windows - } else if (destination.exists()) { + } else if (dest.exists()) { String message = "File name has been re-used with different" + - " files. Spooling assumptions violated for " + newPath; + " files. Spooling assumptions violated for " + dest; throw new IllegalStateException(message); // Destination file does not already exist. We are good to go! } else { - boolean renamed = fileToRoll.renameTo(new File(newPath)); + boolean renamed = fileToRoll.renameTo(dest); if (renamed) { - logger.debug("Successfully rolled file {} to {}", fileToRoll, newPath); + logger.debug("Successfully rolled file {} to {}", fileToRoll, dest); // now we no longer need the meta file deleteMetaFile(); @@ -309,7 +345,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /* If we are here then the file cannot be renamed for a reason other * than that the destination file exists (actually, that remains * possible w/ small probability due to TOC-TOU conditions).*/ - String message = "Unable to move " + currPath + " to " + newPath + + String message = "Unable to move " + fileToRoll + " to " + dest + ". This will likely cause duplicate events. Please verify that " + "flume has sufficient permissions to perform these operations."; throw new FlumeException(message); @@ -318,6 +354,22 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } /** + * Delete the given spooled file + * @param fileToDelete + * @throws IOException + */ + private void deleteCurrentFile(File fileToDelete) throws IOException { + logger.info("Preparing to delete file {}", fileToDelete); + if (!fileToDelete.exists()) { + logger.warn("Unable to delete nonexistent file: {}", fileToDelete); + return; + } + if (!fileToDelete.delete()) { + throw new IOException("Unable to delete spool file: " + fileToDelete); + } + } + + /** * Find and open the oldest file in the chosen directory. If two or more * files are equally old, the file name with lower lexicographical value is * returned. If the directory is empty, this will return an absent option. @@ -336,7 +388,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return true; } }; - List<File> candidateFiles = Arrays.asList(directory.listFiles(filter)); + List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); if (candidateFiles.isEmpty()) { return Optional.absent(); } else { @@ -412,4 +464,85 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { public File getFile() { return file; } } + @InterfaceAudience.Private + @InterfaceStability.Unstable + static enum DeletePolicy { + NEVER, + IMMEDIATE, + DELAY + } + + /** + * Special builder class for ReliableSpoolingFileEventReader + */ + public static class Builder { + private File spoolDirectory; + private String completedSuffix = + SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX; + private String ignorePattern = + SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT; + private String trackerDirPath = + SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR; + private Boolean annotateFileName = + SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER; + private String fileNameHeader = + SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; + private String deserializerType = + SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER; + private Context deserializerContext = new Context(); + private String deletePolicy = + SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY; + + public Builder spoolDirectory(File directory) { + this.spoolDirectory = directory; + return this; + } + + public Builder completedSuffix(String completedSuffix) { + this.completedSuffix = completedSuffix; + return this; + } + + public Builder ignorePattern(String ignorePattern) { + this.ignorePattern = ignorePattern; + return this; + } + + public Builder trackerDirPath(String trackerDirPath) { + this.trackerDirPath = trackerDirPath; + return this; + } + + public Builder annotateFileName(Boolean annotateFileName) { + this.annotateFileName = annotateFileName; + return this; + } + + public Builder fileNameHeader(String fileNameHeader) { + this.fileNameHeader = fileNameHeader; + return this; + } + + public Builder deserializerType(String deserializerType) { + this.deserializerType = deserializerType; + return this; + } + + public Builder deserializerContext(Context deserializerContext) { + this.deserializerContext = deserializerContext; + return this; + } + + public Builder deletePolicy(String deletePolicy) { + this.deletePolicy = deletePolicy; + return this; + } + + public ReliableSpoolingFileEventReader build() throws IOException { + return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix, + ignorePattern, trackerDirPath, annotateFileName, fileNameHeader, + deserializerType, deserializerContext, deletePolicy); + } + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 167193c..552bd48 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -52,9 +52,10 @@ Configurable, EventDrivenSource { private String fileHeaderKey; private int batchSize; private String ignorePattern; - private File metaDirectory; + private String trackerDirPath; private String deserializerType; private Context deserializerContext; + private String deletePolicy; private CounterGroup counterGroup; ReliableSpoolingFileEventReader reader; @@ -70,9 +71,17 @@ Configurable, EventDrivenSource { File directory = new File(spoolDirectory); try { - reader = new ReliableSpoolingFileEventReader(directory, completedSuffix, - ignorePattern, metaDirectory, fileHeader, fileHeaderKey, - deserializerType, deserializerContext); + reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(directory) + .completedSuffix(completedSuffix) + .ignorePattern(ignorePattern) + .trackerDirPath(trackerDirPath) + .annotateFileName(fileHeader) + .fileNameHeader(fileHeaderKey) + .deserializerType(deserializerType) + .deserializerContext(deserializerContext) + .deletePolicy(deletePolicy) + .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", ioe); @@ -99,6 +108,7 @@ Configurable, EventDrivenSource { completedSuffix = context.getString(SPOOLED_FILE_SUFFIX, DEFAULT_SPOOLED_FILE_SUFFIX); + deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY); fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER); fileHeaderKey = context.getString(FILENAME_HEADER_KEY, @@ -106,17 +116,8 @@ Configurable, EventDrivenSource { batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); - ignorePattern = context.getString(IGNORE_PAT, DFLT_IGNORE_PAT); - String metaDirLoc = context.getString(META_DIR, DEFAULT_META_DIR); - - // if absolute path, treat as absolute - if (metaDirLoc.charAt(0) == '/') { - metaDirectory = new File(metaDirLoc); - - // if relative path, treat as relative to spool directory - } else { - metaDirectory = new File(spoolDirectory, DEFAULT_META_DIR); - } + ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT); + trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR); deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER); deserializerContext = new Context(context.getSubProperties(DESERIALIZER + http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 67549e0..afc7288 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -47,7 +47,7 @@ public class SpoolDirectorySourceConfigurationConstants { /** Pattern of files to ignore */ public static final String IGNORE_PAT = "ignorePattern"; - public static final String DFLT_IGNORE_PAT = "^$"; // no effect + public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect /** Directory to store metadata about files being processed */ public static final String META_DIR = "metaDir"; @@ -56,4 +56,7 @@ public class SpoolDirectorySourceConfigurationConstants { /** Deserializer to use to parse the file data into Flume Events */ public static final String DESERIALIZER = "deserializer"; public static final String DEFAULT_DESERIALIZER = "LINE"; + + public static final String DELETE_POLICY = "deletePolicy"; + public static final String DEFAULT_DELETE_POLICY = "never"; } http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java new file mode 100644 index 0000000..a29606e --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.client.avro; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import junit.framework.Assert; +import org.apache.flume.Event; +import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; +import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.List; + +public class TestReliableSpoolingFileEventReader { + + private static final Logger logger = LoggerFactory.getLogger + (TestReliableSpoolingFileEventReader.class); + + private static final File WORK_DIR = new File("target/test/work/" + + TestReliableSpoolingFileEventReader.class.getSimpleName()); + + @Before + public void setup() throws IOException, InterruptedException { + if (!WORK_DIR.isDirectory()) { + Files.createParentDirs(new File(WORK_DIR, "dummy")); + } + + // write out a few files + for (int i = 0; i < 4; i++) { + File fileName = new File(WORK_DIR, "file"+i); + StringBuilder sb = new StringBuilder(); + + // write as many lines as the index of the file + for (int j = 0; j < i; j++) { + sb.append("file" + i + "line" + j + "\n"); + } + Files.write(sb.toString(), fileName, Charsets.UTF_8); + } + Thread.sleep(1500L); // make sure timestamp is later + Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8); + } + + @After + public void tearDown() { + + // delete all the files & dirs we created + File[] files = WORK_DIR.listFiles(); + for (File f : files) { + if (f.isDirectory()) { + File[] subDirFiles = f.listFiles(); + for (File sdf : subDirFiles) { + if (!sdf.delete()) { + logger.warn("Cannot delete file {}", sdf.getAbsolutePath()); + } + } + if (!f.delete()) { + logger.warn("Cannot delete directory {}", f.getAbsolutePath()); + } + } else { + if (!f.delete()) { + logger.warn("Cannot delete file {}", f.getAbsolutePath()); + } + } + } + if (!WORK_DIR.delete()) { + logger.warn("Cannot delete work directory {}", WORK_DIR.getAbsolutePath()); + } + + } + + // FIXME: implement ignore pattern test + @Ignore + @Test + public void testIgnorePattern() { + ReliableSpoolingFileEventReader parser; + } + + @Test + public void testRepeatedCallsWithCommitAlways() throws IOException { + ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR).build(); + + final int expectedLines = 0 + 1 + 2 + 3 + 1; + int seenLines = 0; + for (int i = 0; i < 10; i++) { + List<Event> events = reader.readEvents(10); + seenLines += events.size(); + reader.commit(); + } + + Assert.assertEquals(expectedLines, seenLines); + } + + @Test + public void testRepeatedCallsWithCommitOnSuccess() throws IOException { + String trackerDirPath = + SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR; + File trackerDir = new File(WORK_DIR, trackerDirPath); + + ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build(); + + final int expectedLines = 0 + 1 + 2 + 3 + 1; + int seenLines = 0; + for (int i = 0; i < 10; i++) { + List<Event> events = reader.readEvents(10); + int numEvents = events.size(); + if (numEvents > 0) { + seenLines += numEvents; + reader.commit(); + + // ensure that there are files in the trackerDir + File[] files = trackerDir.listFiles(); + Assert.assertNotNull(files); + Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir + .getAbsolutePath(), files.length > 0); + } + } + + Assert.assertEquals(expectedLines, seenLines); + } + + @Test + public void testFileDeletion() throws IOException { + ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .deletePolicy(DeletePolicy.IMMEDIATE.name()) + .build(); + + List<File> before = listFiles(WORK_DIR); + Assert.assertEquals("Expected 5, not: " + before, 5, before.size()); + + List<Event> events; + do { + events = reader.readEvents(10); + reader.commit(); + } while (!events.isEmpty()); + + List<File> after = listFiles(WORK_DIR); + Assert.assertEquals("Expected 0, not: " + after, 0, after.size()); + } + + private static List<File> listFiles(File dir) { + List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter + () { + @Override + public boolean accept(File pathname) { + return !pathname.isDirectory(); + } + })); + return files; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java index bc10243..ac046a9 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java @@ -26,6 +26,7 @@ import com.google.common.io.Files; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.serialization.LineDeserializer; +import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,9 +44,9 @@ public class TestSpoolingFileLineReader { Logger logger = LoggerFactory.getLogger(TestSpoolingFileLineReader.class); - private static String completedSuffix = ".COMPLETE"; + private static String completedSuffix = + SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX; private static int bufferMaxLineLength = 500; - private static int bufferMaxLines = 30; private File tmpDir; @@ -66,8 +67,11 @@ public class TestSpoolingFileLineReader { ctx.put(LineDeserializer.MAXLINE_KEY, Integer.toString(maxLineLength)); ReliableSpoolingFileEventReader parser; try { - parser = new ReliableSpoolingFileEventReader(tmpDir, completedSuffix, "^$", - new File(tmpDir, ".flumespool"), false, "^$", "LINE", ctx); + parser = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(tmpDir) + .completedSuffix(completedSuffix) + .deserializerContext(ctx) + .build(); } catch (IOException ioe) { throw Throwables.propagate(ioe); } http://git-wip-us.apache.org/repos/asf/flume/blob/750809c7/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java deleted file mode 100644 index abc1827..0000000 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.serialization; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.io.Files; -import junit.framework.Assert; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.client.avro.ReliableEventReader; -import org.apache.flume.client.avro.ReliableSpoolingFileEventReader; -import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -public class TestReliableSpoolingFileEventReader { - - private static final Logger logger = LoggerFactory.getLogger - (TestReliableSpoolingFileEventReader.class); - - private static final File WORK_DIR = new File("target/test/work/" + - TestReliableSpoolingFileEventReader.class.getSimpleName()); - - @Before - public void setup() throws IOException, InterruptedException { - if (!WORK_DIR.isDirectory()) { - Files.createParentDirs(new File(WORK_DIR, "dummy")); - } - - // write out a few files - for (int i = 0; i < 4; i++) { - File fileName = new File(WORK_DIR, "file"+i); - StringBuilder sb = new StringBuilder(); - - // write as many lines as the index of the file - for (int j = 0; j < i; j++) { - sb.append("file" + i + "line" + j + "\n"); - } - Files.write(sb.toString(), fileName, Charsets.UTF_8); - } - Thread.sleep(1500L); // make sure timestamp is later - Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8); - } - - @After - public void tearDown() { - - // delete all the files & dirs we created - File[] files = WORK_DIR.listFiles(); - for (File f : files) { - if (f.isDirectory()) { - File[] subDirFiles = f.listFiles(); - for (File sdf : subDirFiles) { - if (!sdf.delete()) { - logger.warn("Cannot delete file {}", sdf.getAbsolutePath()); - } - } - if (!f.delete()) { - logger.warn("Cannot delete directory {}", f.getAbsolutePath()); - } - } else { - if (!f.delete()) { - logger.warn("Cannot delete file {}", f.getAbsolutePath()); - } - } - } - if (!WORK_DIR.delete()) { - logger.warn("Cannot delete work directory {}", WORK_DIR.getAbsolutePath()); - } - - } - - // FIXME: implement ignore pattern test - @Ignore - @Test - public void testIgnorePattern() { - ReliableSpoolingFileEventReader parser; - } - - @Test - public void testRepeatedCallsWithCommitAlways() throws IOException { - File trackerDir = new File(WORK_DIR, - SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR); - ReliableEventReader reader = new ReliableSpoolingFileEventReader(WORK_DIR, - SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX, - SpoolDirectorySourceConfigurationConstants.DFLT_IGNORE_PAT, - trackerDir, false, "file", - SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER, - new Context()); - - final int expectedLines = 0 + 1 + 2 + 3 + 1; - int seenLines = 0; - for (int i = 0; i < 10; i++) { - List<Event> events = reader.readEvents(10); - seenLines += events.size(); - reader.commit(); - } - - Assert.assertEquals(expectedLines, seenLines); - } - - @Test - public void testRepeatedCallsWithCommitOnSuccess() throws IOException { - File trackerDir = new File(WORK_DIR, - SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR); - ReliableEventReader reader = new ReliableSpoolingFileEventReader(WORK_DIR, - SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX, - SpoolDirectorySourceConfigurationConstants.DFLT_IGNORE_PAT, - trackerDir, false, "file", - SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER, - new Context()); - - final int expectedLines = 0 + 1 + 2 + 3 + 1; - int seenLines = 0; - for (int i = 0; i < 10; i++) { - List<Event> events = reader.readEvents(10); - int numEvents = events.size(); - if (numEvents > 0) { - seenLines += numEvents; - reader.commit(); - - // ensure that there are files in the trackerDir - File[] files = trackerDir.listFiles(); - Assert.assertNotNull(files); - Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir - .getAbsolutePath(), files.length > 0); - } - } - - Assert.assertEquals(expectedLines, seenLines); - } - -}
