----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/50378/#review143374 -----------------------------------------------------------
Hi, Your change interfears with a caching mechanism. I believe the best way to describe this interference is this test below. The problem in a nutshell is when a wildcard is specified then the parent directory used by file matching is not the immediate parent but the directory high above. Thus its last modification time won't be updated when a new file was added. Only mtime of dir1 is updated when file2.txt is added but caching is initialized by fg1 directory so it only monitors that for changes. I think either a documentation should state that when using wildcards then caching won't work or even better an assertation should check this combination at startup. (Making cache work with wildcard would be the best). @Test public void testWildcardsDirFilteringCache() throws IOException, InterruptedException { //first iteration everything is working as expected File f1 = new File(tmpDir.getAbsolutePath() + "/fg1/dir1/file1.txt"); Files.createParentDirs(f1); Files.write("file1\n", f1, Charsets.UTF_8); Context context = new Context(); context.put(POSITION_FILE, posFilePath); context.put(FILE_GROUPS, "fg1"); context.put(FILE_GROUPS_PREFIX + "fg1", tmpDir.getAbsolutePath() + "/fg1/*/file.*"); Configurables.configure(source, context); source.start(); source.process(); Transaction txn = channel.getTransaction(); txn.begin(); List<String> out = Lists.newArrayList(); for (int i = 0; i < 2; i++) { Event e = channel.take(); if (e != null) { out.add(TestTaildirEventReader.bodyAsString(e)); } } txn.commit(); txn.close(); // empty iterations simulating that time is passing by Thread.sleep(1000); source.process(); Thread.sleep(1000); //file was created after a while it should be picked up as well File f2 = new File(tmpDir.getAbsolutePath() + "/fg1/dir1/file2.txt"); Files.write("file2\n", f2, Charsets.UTF_8); source.process(); txn = channel.getTransaction(); txn.begin(); for (int i = 0; i < 2; i++) { Event e = channel.take(); if (e != null) { out.add(TestTaildirEventReader.bodyAsString(e)); } } txn.commit(); txn.close(); assertEquals(2, out.size()); //fails as file2.txt won't appear in the channel ever assertTrue(out.contains("file1")); assertTrue(out.contains("file2")); } - Attila Simon On July 24, 2016, 10:37 a.m., qiao wen wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/50378/ > ----------------------------------------------------------- > > (Updated July 24, 2016, 10:37 a.m.) > > > Review request for Flume. > > > Repository: flume-git > > > Description > ------- > > In our log management project, we wan't to track many log files like this: > /app/dir1/log.* > /app/dir2/log.* > ... > /app/dirn/log.* > But TaildirSource can't support wildcards in filegroup directory name. The > following config is expected: > a1.sources.r1.filegroups.fg = /app/*/log.* > > > Diffs > ----- > > flume-ng-doc/sphinx/FlumeUserGuide.rst 1334500 > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java > ad9f720 > > flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java > 097ee0b > > Diff: https://reviews.apache.org/r/50378/diff/ > > > Testing > ------- > > All tests in TestTaildirSource passed. > > > Thanks, > > qiao wen > >