[ 
https://issues.apache.org/jira/browse/FLUME-2222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13808033#comment-13808033
 ] 

Nikolaos Tsipas commented on FLUME-2222:
----------------------------------------

I did some more investigation on this issue and it looks like the duplicated 
documents are produced when for some reason flume has to rollback a transaction.
Bellow you will find an actual example of document duplication.

*Duplicate message in kibana*
!Screen Shot 2013-10-29 at 12.36.01.png|width=800!

*flume.log on the instance from which the above log line came from*
{code}
29 Oct 2013 12:05:40,112 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.sink.elasticsearch.ElasticSearchSink.process:217)  - Failed 
to commit transaction. Transaction rolled back.
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
        at 
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
        at 
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
        at 
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
29 Oct 2013 12:05:40,113 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
        at 
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
        at 
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
        at 
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
        
{code}

It looks like the log message got indexed by elasticsearch but flume wasn't 
aware of this because of a connection error. So, it rolled back the transaction 
and the same log line was sent twice.
Does this make sense? I think it does but I'd like to read your thoughts on 
this issue.

Regards,
Nick

> Duplicate entries in Elasticsearch when using Flume elasticsearch-sink
> ----------------------------------------------------------------------
>
>                 Key: FLUME-2222
>                 URL: https://issues.apache.org/jira/browse/FLUME-2222
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>         Environment: centos 6
>            Reporter: Nikolaos Tsipas
>              Labels: elasticsearch, sink
>         Attachments: Screen Shot 2013-10-29 at 12.36.01.png
>
>
> Hello,
> I'm using flume elasticsearch-sink to transfer logs from ec2 instances to 
> elasticsearch and I get duplicate entries for numerous documents. 
> I've noticed this issue when I was sending a specific number of log lines to 
> elasticsearch using flume and then I was counting them using kibana to verify 
> that all of them arrived. Most of the time, especially when multiple flume 
> instances were used, I was getting duplicate entries. e.g. instead of 
> receiving 10000 documents from an instance, I was receiving 10060. 
> Duplication level seems to be proportional to the number of instances sending 
> log data simultaneously. e.g. with 3 flume instances I get 10060, with 50 
> flume instances I get 10300.
> Is duplication something that I should expect when using flume 
> elasticsearch-sink?
> There is a {{doRollback()}} method that is called on transaction failure but 
> I think that it updates only the local flume channel and not elasticsearch.
> Any info/suggestions would be appreciated.
> Regards,
> Nick



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to