[ 
https://issues.apache.org/jira/browse/FLUME-2437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14330009#comment-14330009
 ] 

Johny Rufus commented on FLUME-2437:
------------------------------------

Here goes my implementation. Been working on this on and off for some time, 
there is lot of room for improvement/add-ons, the initial version works, for 
the base cases tested. 

https://github.com/jrufus/flume/tree/s3

This implementation is based on the SpoolingDirectorySource logic. More 
modifications to support mark/reset on S3ObjectInputStream.
MapDb is used to store the processed object keys, to differentiate processed 
and yet-to-be processed S3 objects. (This can be upgraded to zookeeper later)

The code is experimental, so needs more testing/pruning 

Tests tried manually so far :
------------------------------

1. Created multiple files in S3 Bucket - make sure the source processes all the 
files
2. Add more files, after the S3 source starts - make sure the newly created S3 
objects are processed
3. Stop the source after a few files are processed - make sure on restart, the 
source only processes the rest of the unprocessed files
4. Stop the source in the middle of processing a file - make sure the postion 
tracker is read on re-start and processing of the partial file continues from 
where it was last marked 

Main classes:
--------------
ResettableGenericInputStream, contains the core logic as a ResettableStream, 
markedBuffer is used to hold the data that has been read after the current Mark 
and tracker is used to keep track of the checkpointed positions in the 
currently processing file, for later recovery (same concept as the 
ResettableFileInputStream, with enough modifications to hold for streams that 
do not support mark and reset)

StreamCreator is used to specify to the ResettableGenericInputStream to create 
a new stream, when the need arises (as part of the seek, if there is a need to 
seek to postions not contained in the MarkedBuffer or the current Buffer)
 
FileStreamCreator(more for testing/experimental purposes) and S3StreamCreator 
are passed to the ResettableGenericInputStream
 

MetaDataBackingStore - FileBasedMetaDataBackingStore (uses MapDb) is used to 
store the list of completed files in a bucket
InMemoryMetadataBackingStore - more for experimental purposes

S3ObjectEventReader - tries to closely follow ReliableSpoolingFileEventReader, 
with modifications to read the S3Object

S3Source, the starter class

Tests:
--------
TestResettableGenericInputStream extends from TestResettableFileInputStream 
with little modifications, so all the test cases that test the 
TestResettableFileInputStream will also test the 
TestResettableGenericInputStream

TestS3Source - mostly based on TestSpoolingDirectorySource, needs more tests 
and revisions

Sample flume.conf
------------------

agent1.channels = ch1
agent1.sources = s3src
agent1.sinks = log-sink1


agent1.channels.ch1.type = memory 

agent1.sources.s3src.channels = ch1
agent1.sources.s3src.type=org.apache.flume.source.s3.S3Source
agent1.sources.s3src.bucketName = <bucket name to be processed/monitored>
agent1.sources.s3src.accessKey = <S3 access key>
agent1.sources.s3src.secretKey = <S3 secret key>
agent1.sources.s3src.backingDir = <Backing Directory to store tracker and MapDb 
files>

agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger


> S3 Source
> ---------
>
>                 Key: FLUME-2437
>                 URL: https://issues.apache.org/jira/browse/FLUME-2437
>             Project: Flume
>          Issue Type: New Feature
>            Reporter: Jonathan Natkins
>            Assignee: Ashish Paliwal
>
> There have been multiple requests on the mailing list for an S3 source



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to