[ 
https://issues.apache.org/jira/browse/FLUME-2718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hari updated FLUME-2718:
------------------------
    Description: 
Currently the HTTP Source supports JSONHandler as the default implementation. A 
more generic approach will be having a BLOBHandler which accepts any request 
input stream (that loads the stream as Event payload). Furthermore, this 
Handler lets you define mandatory request parameters and maps those parameters 
into Event Headers. 

This way HTTPSource can be used as a generic Data Ingress endpoint for any 
sink, where one can specify attributes run like basepath, filename & timestamp 
as request parameters and access those values via HEADER values in sink 
properties.

All this can be done without developing any custom Handler code.

For e.g.

With the below agent configuration, you can send any type of data 
(JSON/CSV/TSV) and store it in any sink, HDFS in this case. 

{code:title=sample command|borderStyle=solid}
curl -v -X POST 
"http://testHost:8080/?basepath=/data/&filename=test.json&timestamp=1434101498275";
 --data @test.json
{code}

{code:title=HDFS data path |borderStyle=solid}
/data/2015/06/12/test.json.1434101498275.lzo
{code}

{code:title=agent.conf|borderStyle=solid}
#Agent configuration
#HTTP Source configuration
agent.sources = httpSrc
agent.channels = memChannel
agent.sources.httpSrc.type = http
agent.sources.httpSrc.channels = memChannel
agent.sources.httpSrc.bind = testHost
agent.sources.httpSrc.port = 8080
agent.sources.httpSrc.handler = org.apache.flume.source.http.BLOBHandler
agent.sources.httpSrc.handler.mandatoryParameters = basepath, filename

#Memory channel with default configuration
agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 100000
agent.channels.memChannel.transactionCapacity = 1000

#HDFS Sink configuration
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = %{basepath}/%Y/%m/%d
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.filePrefix = %{filename}
agent.sinks.hdfsSink.hdfs.fileType = CompressedStream
agent.sinks.hdfsSink.hdfs.codeC = lzop
agent.sinks.hdfsSink.channel = memChannel

#Finally, activate.
agent.channels = memChannel
agent.sources = httpSrc
agent.sinks = hdfsSink
{code}


  was:
Currently, the HTTP Source supports JSONHandler as default implementation.
Instead, having a BLOBHandler which accepts any request inputstream which loads 
the stream as Event payload will be more generic. And further, this Handler 
lets you define mandatory request parameters and maps those parameters into 
Event Headers. 

By this way HTTPSource can be used as a generic Data Ingress endpoint for any 
sink, where one can specify attributes run like basepath, filename & timestamp 
as request parameters and access those values via HEADER values in sink 
properties.

All this can be done without developing any custom Handler code.

For e.g.

With the below agent configuration, you can send any type of data 
(JSON/CSV/TSV) and store it in any sink, HDFS in this case. 

{code:title=sample command|borderStyle=solid}
curl -v -X POST 
"http://testHost:8080/?basepath=/data/&filename=test.json&timestamp=1434101498275";
 --data @test.json
{code}

{code:title=HDFS data path |borderStyle=solid}
/data/2015/06/12/test.json.1434101498275.lzo
{code}

{code:title=agent.conf|borderStyle=solid}
#Agent configuration
#HTTP Source configuration
agent.sources = httpSrc
agent.channels = memChannel
agent.sources.httpSrc.type = http
agent.sources.httpSrc.channels = memChannel
agent.sources.httpSrc.bind = testHost
agent.sources.httpSrc.port = 8080
agent.sources.httpSrc.handler = org.apache.flume.source.http.BLOBHandler
agent.sources.httpSrc.handler.mandatoryParameters = basepath, filename

#Memory channel with default configuration
agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 100000
agent.channels.memChannel.transactionCapacity = 1000

#HDFS Sink configuration
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = %{basepath}/%Y/%m/%d
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.filePrefix = %{filename}
agent.sinks.hdfsSink.hdfs.fileType = CompressedStream
agent.sinks.hdfsSink.hdfs.codeC = lzop
agent.sinks.hdfsSink.channel = memChannel

#Finally, activate.
agent.channels = memChannel
agent.sources = httpSrc
agent.sinks = hdfsSink
{code}



> HTTP Source to support generic Stream Handler
> ---------------------------------------------
>
>                 Key: FLUME-2718
>                 URL: https://issues.apache.org/jira/browse/FLUME-2718
>             Project: Flume
>          Issue Type: Improvement
>          Components: Sinks+Sources
>            Reporter: Hari
>         Attachments: 
> 0001-FLUME-2718-HTTP-Source-to-support-generic-Stream-Han.patch
>
>
> Currently the HTTP Source supports JSONHandler as the default implementation. 
> A more generic approach will be having a BLOBHandler which accepts any 
> request input stream (that loads the stream as Event payload). Furthermore, 
> this Handler lets you define mandatory request parameters and maps those 
> parameters into Event Headers. 
> This way HTTPSource can be used as a generic Data Ingress endpoint for any 
> sink, where one can specify attributes run like basepath, filename & 
> timestamp as request parameters and access those values via HEADER values in 
> sink properties.
> All this can be done without developing any custom Handler code.
> For e.g.
> With the below agent configuration, you can send any type of data 
> (JSON/CSV/TSV) and store it in any sink, HDFS in this case. 
> {code:title=sample command|borderStyle=solid}
> curl -v -X POST 
> "http://testHost:8080/?basepath=/data/&filename=test.json&timestamp=1434101498275";
>  --data @test.json
> {code}
> {code:title=HDFS data path |borderStyle=solid}
> /data/2015/06/12/test.json.1434101498275.lzo
> {code}
> {code:title=agent.conf|borderStyle=solid}
> #Agent configuration
> #HTTP Source configuration
> agent.sources = httpSrc
> agent.channels = memChannel
> agent.sources.httpSrc.type = http
> agent.sources.httpSrc.channels = memChannel
> agent.sources.httpSrc.bind = testHost
> agent.sources.httpSrc.port = 8080
> agent.sources.httpSrc.handler = org.apache.flume.source.http.BLOBHandler
> agent.sources.httpSrc.handler.mandatoryParameters = basepath, filename
> #Memory channel with default configuration
> agent.channels.memChannel.type = memory
> agent.channels.memChannel.capacity = 100000
> agent.channels.memChannel.transactionCapacity = 1000
> #HDFS Sink configuration
> agent.sinks.hdfsSink.type = hdfs
> agent.sinks.hdfsSink.hdfs.path = %{basepath}/%Y/%m/%d
> agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
> agent.sinks.hdfsSink.hdfs.filePrefix = %{filename}
> agent.sinks.hdfsSink.hdfs.fileType = CompressedStream
> agent.sinks.hdfsSink.hdfs.codeC = lzop
> agent.sinks.hdfsSink.channel = memChannel
> #Finally, activate.
> agent.channels = memChannel
> agent.sources = httpSrc
> agent.sinks = hdfsSink
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to