> On Oct. 7, 2012, 4:30 a.m., Mike Percy wrote:
> > Sorry for taking so long for me to get back to this. I read through the
> > code again, and I figured out what was throwing me off the first time. The
> > way this is structured, there is a polling check-and-sleep mechanism built
> > into the SpoolingFileLineReader, causing it to block, as well as a
> > start/stop mechanism. This makes the SpoolingFileLineReader pretty
> > complicated and, in my opinion, a bit tricky to understand. I think it
> > would be great to refactor this code so that the looping mechanism was done
> > via ScheduledExecutorService.scheduleWithFixedDelay at the top level of the
> > Source and/or Client (actually, an Executor is already there in the
> > Source), allowing the SpoolingFileLineReader to focus more on reading files
> > and less on maintaining state. This should also remove the need for the
> > ReaderStoppedException.
> >
> > Another thing to consider is error conditions. What happens if
> > processEvents() throws a ChannelException? This can happen if the channel
> > is full. In that case, we should back off a bit, and then retry pushing the
> > same events on the next run. Along the same vein, we should not roll a file
> > until we are certain all events have successfully been committed to a
> > channel.
> >
> > To clarify what I'm suggesting, this is an example of how it could be
> > refactored:
> >
> > SpoolDirectorySource.start:
> > scheduledExecutorService.scheduleWithFixedDelay(new SpoolRunnable(new
> > SpoolingFileLineReader(...)), 0L, POLL_SLEEP_MS, TimeUnit.MILLISECONDS);
> > SpoolRunnable.run: while (true) { lines = reader.readLines(batchSize); if
> > (lines == null) return; for (line in lines) {
> > events.add(createEvent(line)); } try {
> > channelProcessor.processEvents(events); reader.commit(); } catch
> > (ChannelException ex) { /* ... */ } }
> > SpoolingFileLineReader.readLines: If no commit since last readLines() call,
> > then return the previous result from a saved buffer. Otherwise, read, save
> > to a buffer, and then return lines from an available file if possible. If
> > no files are available, return null.
> > SpoolingFileLineReader.commit: Clear the saved buffer. If the current file
> > is at EOF, then retire the file.
> >
> > Thanks for all the hard work. Let's take this to the finish line.
>
> Patrick Wendell wrote:
> In general - I like this pattern much better than what is there now.
>
> One question though - this assumes that a given SpoolingFileLineReader
> will only be accessed from a single thread. If several threads are calling
> readLines() and commit() it will mess with the semantics of commit(). In the
> current model you can have multiple threads reading lines interchangeably (I
> think - thought I haven't fully fleshed this out in my mind).
>
> I think not being thread-safe is fine though, given the way that this
> hooks into a source - where it will only be accessed by one thread.
Agreed - as we discussed on #flume IRC, it seems reasonable to simplify this
and only support a single thread reading each file at a time. This fits with
the Sink threading model.
- Mike
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6377/#review12210
-----------------------------------------------------------
On Aug. 14, 2012, 10:02 p.m., Patrick Wendell wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6377/
> -----------------------------------------------------------
>
> (Updated Aug. 14, 2012, 10:02 p.m.)
>
>
> Review request for Flume.
>
>
> Description
> -------
>
> This patch adds a spooling directory based source. The idea is that a user
> can have a spool directory where files are deposited for ingestion into
> flume. Once ingested, the files are clearly renamed and the implementation
> guarantees at-least-once delivery semantics similar to those achieved within
> flume itself, even across failures and restarts of the JVM running the code.
>
> This helps fill the gap for people who want a way to get reliable delivery of
> events into flume, but don't want to directly write their application against
> the flume API. They can simply drop log files off in a spooldir and let flume
> ingest asynchronously (using some shell scripts or other automated process).
>
> Unlike the prior iteration, this patch implements a first-class source. It
> also extends the avro client to support spooling in a similar manner.
>
>
> This addresses bug FlUME-1425.
> https://issues.apache.org/jira/browse/FlUME-1425
>
>
> Diffs
> -----
>
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
> da804d7
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
> abbbf1c
> flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
> 4a5ecae
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
> PRE-CREATION
> flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
> PRE-CREATION
> flume-ng-doc/sphinx/FlumeUserGuide.rst 45dd7cc
>
> Diff: https://reviews.apache.org/r/6377/diff/
>
>
> Testing
> -------
>
> Extensive unit tests and I also built and played with this using a stub flume
> agent. If you look at the JIRA I have a configuration file for an agent that
> will print out Avro events to the command line - that's helpful when testing
> this.
>
>
> Thanks,
>
> Patrick Wendell
>
>