-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21581/#review43489
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
<https://reviews.apache.org/r/21581/#comment77641>

    private def, since this is an implementation detail.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77643>

    Can we move these variables to the constructor as default parameters? We 
can then update the FileReaderSystemFactory to get values out of the config 
object, and pass them in here.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77644>

    Include unit of time in variable name and docs. Milliseconds, I'm assuming?



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77661>

    Should use DaemonThreadFactory as second parameter here. This is a 
samza-core class that creates daemon threads in the executor, and names threads 
properly.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77645>

    maybe call this "shutdown", and use while(!shutdown) in the while loop?



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77648>

    Probably want to surround everything in the loop in a try/catch, so we can 
properly catch ThreadInterruptException, and shut down the loop and close file 
handle (so we don't leak it).
    
    You can then set var file = null above, and just change the if to if file 
== null || file.length <= filePointer. 



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77647>

    Should close file before creating a new one so we don't leak file handles.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
<https://reviews.apache.org/r/21581/#comment77666>

    It looks like you're putting the current value of i into the offset field, 
which is the offset of the last byte read. The offset should represent the 
point in the file that we need to seek to begin reading this message (i.e. the 
offset of the first byte read for this message). I think we ned a second 
variable that tracks the beginning offset for the current message (i + 1 when 
cha == '\n'). We should use this variable when creating the 
IncomingMessageEnvelope, and then reset it when we reset the line variable.
    
    This change also requires a re-implementation of getOffsetsAfter. We'll 
need to seek to the supplied offset, then read through to the next newline, 
then return the newline's offset + 1. In the case where there is no next 
newline, we'll have to throw an exception. This case should never happen in 
getOffsetsAfter's current usage, since we always call getOffsetsAfter on 
messages that we've fully consumed (i.e. there was a newline AFTER the message).



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
<https://reviews.apache.org/r/21581/#comment77649>

    If you move the const variables in FileSystemReader (above) into the 
constructor, you can use the config object to get config variables from the 
config.
    
    For names, my vote would be for:
    
    systems.<system name>.queue.size
    systems.<system name>.polling.sleep.ms



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
<https://reviews.apache.org/r/21581/#comment77650>

    nit: suppose->supposed


- Chris Riccomini


On May 16, 2014, 9:15 p.m., Yan Fang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21581/
> -----------------------------------------------------------
> 
> (Updated May 16, 2014, 9:15 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> 1. moved from old request SAMZA-138
> 2. new patch based on Samza core repository
> 3. converted to Scala
> 4. added javadoc for explaining
> 5. FileReaderSystemAdmin implements SystemAdmin and provides metadata for a 
> file, including oldest offset, newest offset and upcming offset.
> 6. FileReaderConsumer implements BlockingEnvelopeMap. It creates one thread 
> for each file-read operation. After reading existing messages, the thread 
> keeps checking the new messages coming to the file until the job is stopped.
> 7. FileReaderFactory is implementing SystemFactory.
> 8. related unit tests 
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/21581/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yan Fang
> 
>

Reply via email to