> On March 17, 2014, 4:37 p.m., Edward Sargisson wrote: > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java, > > line 107 > > <https://reviews.apache.org/r/16650/diff/6/?file=521231#file521231line107> > > > > Woah! What? > > Can we discuss if a lock is the best approach here? This will serialize > > every write going though to ES. > > > > This doesn't mean this isn't the best way - I'd just like to discuss > > it. What's forced us into needing it? Can we not use some other techniques > > like immutable objects, etc? > > Pawe? wrote: > Hi, > That's good point. I had doubt if it is good to use synchronized. But I'm > just afraid of situation where sink's addEvent have multiple calls by many > threads. Maybe it is better to store that events in some thread safe > datastructure and serialize objects only on execute method. What do you think?
I refreshed my memory of the concurrency design so that we could check the reasoning. Unless I've missed something, the Sinks are single-threaded. The flow goes SinkRunner.start() -> new Thread(new PollingRunner()) -> (AbstractSinkProcess derived).process() -> AbstractSink.process(). On that basis, the presence/absence of synchronized won't harm anything much. I'm happy with this. > On March 17, 2014, 4:37 p.m., Edward Sargisson wrote: > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java, > > line 130 > > <https://reviews.apache.org/r/16650/diff/6/?file=521231#file521231line130> > > > > This code doesn't particularly handle the case where es wrote the > > change but the status code got lost. We may need to add a warning in the > > docs about this. In our experience we find that setting an ID early in the > > event stream and using last write wins here works well. > > Pawe? wrote: > I think I don't actually catch what you mean. > When there is some error from ES the change is send again. What do you > mean by adding warning in the docs? By default, es randomly generated an ID for a document. When writing to es, it's possible for the acknowledgement of the write to be lost. Thus, the sink may write a batch, never receive the ack and then write it again. In this circumstance you'll have duplicate documents. Hence, adding a warning to the docs about this edge case could be useful. - Edward ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/16650/#review33951 ----------------------------------------------------------- On March 16, 2014, 6:40 p.m., Pawe? wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/16650/ > ----------------------------------------------------------- > > (Updated March 16, 2014, 6:40 p.m.) > > > Review request for Flume, Brock Noland, Hari Shreedharan, and Mike Percy. > > > Repository: flume-git > > > Description > ------- > > The patch contains changes in elasticsearch sink. It gives possibility to > swich between Transport Client and REST Client to elasticsearch. > Jest library (https://github.com/searchbox-io/Jest) client was used to > communicate with elasticsearch by HTTP API. It uses guava 14 so it was > necessary also to change version of guava used in flume (11.0.2 -> 14.0.1) > > Patch FLUME-2225-0.patch > > > Diffs > ----- > > flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml bdc21d1 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java > 6effe34 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java > 8e77a1e > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java > e38ab19 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java > dd0c59d > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java > 43a4b12 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java > 1e4e119 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java > 9dff4b0 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java > 71789e8 > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java > PRE-CREATION > > flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/16650/diff/ > > > Testing > ------- > > > Thanks, > > Pawe? > >
