-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6377/#review12210
-----------------------------------------------------------
Sorry for taking so long for me to get back to this. I read through the code
again, and I figured out what was throwing me off the first time. The way this
is structured, there is a polling check-and-sleep mechanism built into the
SpoolingFileLineReader, causing it to block, as well as a start/stop mechanism.
This makes the SpoolingFileLineReader pretty complicated and, in my opinion, a
bit tricky to understand. I think it would be great to refactor this code so
that the looping mechanism was done via
ScheduledExecutorService.scheduleWithFixedDelay at the top level of the Source
and/or Client (actually, an Executor is already there in the Source), allowing
the SpoolingFileLineReader to focus more on reading files and less on
maintaining state. This should also remove the need for the
ReaderStoppedException.
Another thing to consider is error conditions. What happens if processEvents()
throws a ChannelException? This can happen if the channel is full. In that
case, we should back off a bit, and then retry pushing the same events on the
next run. Along the same vein, we should not roll a file until we are certain
all events have successfully been committed to a channel.
To clarify what I'm suggesting, this is an example of how it could be
refactored:
SpoolDirectorySource.start: scheduledExecutorService.scheduleWithFixedDelay(new
SpoolRunnable(new SpoolingFileLineReader(...)), 0L, POLL_SLEEP_MS,
TimeUnit.MILLISECONDS);
SpoolRunnable.run: while (true) { lines = reader.readLines(batchSize); if
(lines == null) return; for (line in lines) { events.add(createEvent(line)); }
try { channelProcessor.processEvents(events); reader.commit(); } catch
(ChannelException ex) { /* ... */ } }
SpoolingFileLineReader.readLines: If no commit since last readLines() call,
then return the previous result from a saved buffer. Otherwise, read, save to a
buffer, and then return lines from an available file if possible. If no files
are available, return null.
SpoolingFileLineReader.commit: Clear the saved buffer. If the current file is
at EOF, then retire the file.
Thanks for all the hard work. Let's take this to the finish line.
- Mike Percy
On Aug. 14, 2012, 10:02 p.m., Patrick Wendell wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6377/
> -----------------------------------------------------------
>
> (Updated Aug. 14, 2012, 10:02 p.m.)
>
>
> Review request for Flume.
>
>
> Description
> -------
>
> This patch adds a spooling directory based source. The idea is that a user
> can have a spool directory where files are deposited for ingestion into
> flume. Once ingested, the files are clearly renamed and the implementation
> guarantees at-least-once delivery semantics similar to those achieved within
> flume itself, even across failures and restarts of the JVM running the code.
>
> This helps fill the gap for people who want a way to get reliable delivery of
> events into flume, but don't want to directly write their application against
> the flume API. They can simply drop log files off in a spooldir and let flume
> ingest asynchronously (using some shell scripts or other automated process).
>
> Unlike the prior iteration, this patch implements a first-class source. It
> also extends the avro client to support spooling in a similar manner.
>
>
> This addresses bug FlUME-1425.
> https://issues.apache.org/jira/browse/FlUME-1425
>
>
> Diffs
> -----
>
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
> da804d7
>
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
> abbbf1c
> flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
> 4a5ecae
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
> PRE-CREATION
> flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
> PRE-CREATION
>
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
> PRE-CREATION
>
> flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
> PRE-CREATION
> flume-ng-doc/sphinx/FlumeUserGuide.rst 45dd7cc
>
> Diff: https://reviews.apache.org/r/6377/diff/
>
>
> Testing
> -------
>
> Extensive unit tests and I also built and played with this using a stub flume
> agent. If you look at the JIRA I have a configuration file for an agent that
> will print out Avro events to the command line - that's helpful when testing
> this.
>
>
> Thanks,
>
> Patrick Wendell
>
>