[
https://issues.apache.org/jira/browse/FLUME-2437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14330009#comment-14330009
]
Johny Rufus commented on FLUME-2437:
------------------------------------
Here goes my implementation. Been working on this on and off for some time,
there is lot of room for improvement/add-ons, the initial version works, for
the base cases tested.
https://github.com/jrufus/flume/tree/s3
This implementation is based on the SpoolingDirectorySource logic. More
modifications to support mark/reset on S3ObjectInputStream.
MapDb is used to store the processed object keys, to differentiate processed
and yet-to-be processed S3 objects. (This can be upgraded to zookeeper later)
The code is experimental, so needs more testing/pruning
Tests tried manually so far :
------------------------------
1. Created multiple files in S3 Bucket - make sure the source processes all the
files
2. Add more files, after the S3 source starts - make sure the newly created S3
objects are processed
3. Stop the source after a few files are processed - make sure on restart, the
source only processes the rest of the unprocessed files
4. Stop the source in the middle of processing a file - make sure the postion
tracker is read on re-start and processing of the partial file continues from
where it was last marked
Main classes:
--------------
ResettableGenericInputStream, contains the core logic as a ResettableStream,
markedBuffer is used to hold the data that has been read after the current Mark
and tracker is used to keep track of the checkpointed positions in the
currently processing file, for later recovery (same concept as the
ResettableFileInputStream, with enough modifications to hold for streams that
do not support mark and reset)
StreamCreator is used to specify to the ResettableGenericInputStream to create
a new stream, when the need arises (as part of the seek, if there is a need to
seek to postions not contained in the MarkedBuffer or the current Buffer)
FileStreamCreator(more for testing/experimental purposes) and S3StreamCreator
are passed to the ResettableGenericInputStream
MetaDataBackingStore - FileBasedMetaDataBackingStore (uses MapDb) is used to
store the list of completed files in a bucket
InMemoryMetadataBackingStore - more for experimental purposes
S3ObjectEventReader - tries to closely follow ReliableSpoolingFileEventReader,
with modifications to read the S3Object
S3Source, the starter class
Tests:
--------
TestResettableGenericInputStream extends from TestResettableFileInputStream
with little modifications, so all the test cases that test the
TestResettableFileInputStream will also test the
TestResettableGenericInputStream
TestS3Source - mostly based on TestSpoolingDirectorySource, needs more tests
and revisions
Sample flume.conf
------------------
agent1.channels = ch1
agent1.sources = s3src
agent1.sinks = log-sink1
agent1.channels.ch1.type = memory
agent1.sources.s3src.channels = ch1
agent1.sources.s3src.type=org.apache.flume.source.s3.S3Source
agent1.sources.s3src.bucketName = <bucket name to be processed/monitored>
agent1.sources.s3src.accessKey = <S3 access key>
agent1.sources.s3src.secretKey = <S3 secret key>
agent1.sources.s3src.backingDir = <Backing Directory to store tracker and MapDb
files>
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
> S3 Source
> ---------
>
> Key: FLUME-2437
> URL: https://issues.apache.org/jira/browse/FLUME-2437
> Project: Flume
> Issue Type: New Feature
> Reporter: Jonathan Natkins
> Assignee: Ashish Paliwal
>
> There have been multiple requests on the mailing list for an S3 source
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)