[
https://issues.apache.org/jira/browse/FLINK-20579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20579:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> eash es sink will have
> -----------------------
>
> Key: FLINK-20579
> URL: https://issues.apache.org/jira/browse/FLINK-20579
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Reporter: donglei
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> BulkProcessorListener beforebulk must have the same route to speed up write
> to es
>
> As we know bulk with same route will send to es only one node and with one
> netio one disk io so every
>
> !http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
>
> Therefore, we take the following method. The beforeBulk in
> ElasticsearchSinkBase writes the same bulk according to the same batch. like
> this,
> private class BulkProcessorListener implements BulkProcessor.Listener {
> @Override
> public void beforeBulk(long executionId, BulkRequest request) {
> if (routePreBulk) {//Need to verify, whether there is a route set upstream
> String routing = UUID.randomUUID() + "_" + executionId;
> List<ActionRequest> requests = request.requests();
> requests.forEach(x -> {
> ((IndexRequest) x).routing(routing);
> });
> LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(),
> routing);
> }
> }
> The advantage of this is that when there are many es fragments later, because
> every bulk has the same route sent to the same es node, it saves es data
> splitting time and data landing time, and improves es performance.
> Preliminary estimates, this part can improve the performance of more than 2
> times.
> The discussion points here are:
> Q: can we use keyby and with same route value
> A: Since we use this function to improve performance, setting the same route
> value after upstream keyby cannot guarantee that all data will be sent in one
> batch, such as 1w data and one route value, but there is no guarantee that 1w
> data will be in the same batch. .
> Q: How to judge whether to add route value
> A: Since oceanus cannot provide an external API interface, it is recommended
> to sample here, for example, to see if there is a route in a batch, if there
> are none, think that this sink does not need a route value.
> Q: Is the data uniform
> A: we has been running for a long time. In this setting, because bulk is
> route value is uniform, es data is uniform
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)