> On Oct. 7, 2012, 4:30 a.m., Mike Percy wrote:
> > Sorry for taking so long for me to get back to this. I read through the 
> > code again, and I figured out what was throwing me off the first time. The 
> > way this is structured, there is a polling check-and-sleep mechanism built 
> > into the SpoolingFileLineReader, causing it to block, as well as a 
> > start/stop mechanism. This makes the SpoolingFileLineReader pretty 
> > complicated and, in my opinion, a bit tricky to understand. I think it 
> > would be great to refactor this code so that the looping mechanism was done 
> > via ScheduledExecutorService.scheduleWithFixedDelay at the top level of the 
> > Source and/or Client (actually, an Executor is already there in the 
> > Source), allowing the SpoolingFileLineReader to focus more on reading files 
> > and less on maintaining state. This should also remove the need for the 
> > ReaderStoppedException.
> > 
> > Another thing to consider is error conditions. What happens if 
> > processEvents() throws a ChannelException? This can happen if the channel 
> > is full. In that case, we should back off a bit, and then retry pushing the 
> > same events on the next run. Along the same vein, we should not roll a file 
> > until we are certain all events have successfully been committed to a 
> > channel.
> > 
> > To clarify what I'm suggesting, this is an example of how it could be 
> > refactored:
> > 
> > SpoolDirectorySource.start: 
> > scheduledExecutorService.scheduleWithFixedDelay(new SpoolRunnable(new 
> > SpoolingFileLineReader(...)), 0L, POLL_SLEEP_MS, TimeUnit.MILLISECONDS);
> > SpoolRunnable.run: while (true) { lines = reader.readLines(batchSize); if 
> > (lines == null) return; for (line in lines) { 
> > events.add(createEvent(line)); } try { 
> > channelProcessor.processEvents(events); reader.commit(); } catch 
> > (ChannelException ex) { /* ... */ } }
> > SpoolingFileLineReader.readLines: If no commit since last readLines() call, 
> > then return the previous result from a saved buffer. Otherwise, read, save 
> > to a buffer, and then return lines from an available file if possible. If 
> > no files are available, return null.
> > SpoolingFileLineReader.commit: Clear the saved buffer. If the current file 
> > is at EOF, then retire the file.
> > 
> > Thanks for all the hard work. Let's take this to the finish line.
> 
> Patrick Wendell wrote:
>     In general - I like this pattern much better than what is there now.
>     
>     One question though - this assumes that a given SpoolingFileLineReader 
> will only be accessed from a single thread. If several threads are calling 
> readLines() and commit() it will mess with the semantics of commit(). In the 
> current model you can have multiple threads reading lines interchangeably (I 
> think - thought I haven't fully fleshed this out in my mind).
>     
>     I think not being thread-safe is fine though, given the way that this 
> hooks into a source - where it will only be accessed by one thread.

Agreed - as we discussed on #flume IRC, it seems reasonable to simplify this 
and only support a single thread reading each file at a time. This fits with 
the Sink threading model.


- Mike


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6377/#review12210
-----------------------------------------------------------


On Aug. 14, 2012, 10:02 p.m., Patrick Wendell wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6377/
> -----------------------------------------------------------
> 
> (Updated Aug. 14, 2012, 10:02 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Description
> -------
> 
> This patch adds a spooling directory based source. The  idea is that a user 
> can have a spool directory where files are deposited for ingestion into 
> flume. Once ingested, the files are clearly renamed and the implementation 
> guarantees at-least-once delivery semantics similar to those achieved within 
> flume itself, even across failures and restarts of the JVM running the code.
> 
> This helps fill the gap for people who want a way to get reliable delivery of 
> events into flume, but don't want to directly write their application against 
> the flume API. They can simply drop log files off in a spooldir and let flume 
> ingest asynchronously (using some shell scripts or other automated process).
> 
> Unlike the prior iteration, this patch implements a first-class source. It 
> also extends the avro client to support spooling in a similar manner.
> 
> 
> This addresses bug FlUME-1425.
>     https://issues.apache.org/jira/browse/FlUME-1425
> 
> 
> Diffs
> -----
> 
>   
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
>  da804d7 
>   
> flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
>  abbbf1c 
>   flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java 
> 4a5ecae 
>   
> flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
>  PRE-CREATION 
>   flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java 
> PRE-CREATION 
>   
> flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
>  PRE-CREATION 
>   
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
> PRE-CREATION 
>   
> flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
>  PRE-CREATION 
>   
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
>  PRE-CREATION 
>   
> flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
>  PRE-CREATION 
>   
> flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
>  PRE-CREATION 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 45dd7cc 
> 
> Diff: https://reviews.apache.org/r/6377/diff/
> 
> 
> Testing
> -------
> 
> Extensive unit tests and I also built and played with this using a stub flume 
> agent. If you look at the JIRA I have a configuration file for an agent that 
> will print out Avro events to the command line - that's helpful when testing 
> this.
> 
> 
> Thanks,
> 
> Patrick Wendell
> 
>

Reply via email to