[ 
https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20579:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> eash es sink will have 
> -----------------------
>
>                 Key: FLINK-20579
>                 URL: https://issues.apache.org/jira/browse/FLINK-20579
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / ElasticSearch
>            Reporter: donglei
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> BulkProcessorListener beforebulk must have the same route  to speed up write 
> to es
>  
> As we know bulk with same route will send to es only one node and with one 
> netio one disk io  so every 
>  
> !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
>  
> Therefore, we take the following method. The beforeBulk in 
> ElasticsearchSinkBase writes the same bulk according to the same batch.  like 
> this,
> private class BulkProcessorListener implements BulkProcessor.Listener {
> @Override
> public void beforeBulk(long executionId, BulkRequest request) {
> if (routePreBulk) {//Need to verify, whether there is a route set upstream
> String routing = UUID.randomUUID() + "_" + executionId;
> List<ActionRequest> requests = request.requests();
> requests.forEach(x -> {
> ((IndexRequest) x).routing(routing);
> });
> LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(), 
> routing);
> }
> }
> The advantage of this is that when there are many es fragments later, because 
> every bulk has the same route sent to the same es node, it saves es data 
> splitting time and data landing time, and improves es performance.
> Preliminary estimates, this part can improve the performance of more than 2 
> times.
> The discussion points here are:
> Q: can we use  keyby and with same route value
> A: Since we use this function to improve performance, setting the same route 
> value after upstream keyby cannot guarantee that all data will be sent in one 
> batch, such as 1w data and one route value, but there is no guarantee that 1w 
> data will be in the same batch. .
> Q: How to judge whether to add route value
> A: Since oceanus cannot provide an external API interface, it is recommended 
> to sample here, for example, to see if there is a route in a batch, if there 
> are none, think that this sink does not need a route value.
> Q: Is the data uniform
> A: we has been running for a long time. In this setting, because bulk is 
> route value is uniform, es data is uniform
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to