> On March 17, 2014, 4:37 p.m., Edward Sargisson wrote:
> > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java,
> >  line 107
> > <https://reviews.apache.org/r/16650/diff/6/?file=521231#file521231line107>
> >
> >     Woah! What?
> >     Can we discuss if a lock is the best approach here? This will serialize 
> > every write going though to ES.
> >     
> >     This doesn't mean this isn't the best way - I'd just like to discuss 
> > it. What's forced us into needing it? Can we not use some other techniques 
> > like immutable objects, etc?
> 
> Pawe? wrote:
>     Hi,
>     That's good point. I had doubt if it is good to use synchronized. But I'm 
> just afraid of situation where sink's addEvent have multiple calls by many 
> threads. Maybe it is better to store that events in some thread safe 
> datastructure and serialize objects only on execute method. What do you think?

I refreshed my memory of the concurrency design so that we could check the 
reasoning.
Unless I've missed something, the Sinks are single-threaded. The flow goes 
SinkRunner.start() -> new Thread(new PollingRunner()) -> (AbstractSinkProcess 
derived).process() -> AbstractSink.process().
On that basis, the presence/absence of synchronized won't harm anything much.
I'm happy with this.


> On March 17, 2014, 4:37 p.m., Edward Sargisson wrote:
> > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java,
> >  line 130
> > <https://reviews.apache.org/r/16650/diff/6/?file=521231#file521231line130>
> >
> >     This code doesn't particularly handle the case where es wrote the 
> > change but the status code got lost. We may need to add a warning in the 
> > docs about this. In our experience we find that setting an ID early in the 
> > event stream and using last write wins here works well.
> 
> Pawe? wrote:
>     I think I don't actually catch what you mean.
>     When there is some error from ES the change is send again. What do you 
> mean by adding warning in the docs?

By default, es randomly generated an ID for a document. When writing to es, 
it's possible for the acknowledgement of the write to be lost. Thus, the sink 
may write a batch, never receive the ack and then write it again.
In this circumstance you'll have duplicate documents.

Hence, adding a warning to the docs about this edge case could be useful.


- Edward


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16650/#review33951
-----------------------------------------------------------


On March 16, 2014, 6:40 p.m., Pawe? wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16650/
> -----------------------------------------------------------
> 
> (Updated March 16, 2014, 6:40 p.m.)
> 
> 
> Review request for Flume, Brock Noland, Hari Shreedharan, and Mike Percy.
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> The patch contains changes in elasticsearch sink. It gives possibility to 
> swich between Transport Client and REST Client to elasticsearch.
> Jest library (https://github.com/searchbox-io/Jest) client was used to 
> communicate with elasticsearch by HTTP API. It uses guava 14 so it was 
> necessary also to change version of guava used in flume (11.0.2 -> 14.0.1)
> 
> Patch FLUME-2225-0.patch
> 
> 
> Diffs
> -----
> 
>   flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml bdc21d1 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
>  6effe34 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
>  8e77a1e 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
>  e38ab19 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
>  dd0c59d 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
>  43a4b12 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
>  1e4e119 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
>  9dff4b0 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
>  71789e8 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
>  PRE-CREATION 
>   
> flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/16650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Pawe?
> 
>

Reply via email to