donglei created FLINK-20579:
-------------------------------
Summary: eash es sink will have
Key: FLINK-20579
URL: https://issues.apache.org/jira/browse/FLINK-20579
Project: Flink
Issue Type: Improvement
Reporter: donglei
BulkProcessorListener beforebulk must have the same route to speed up write to
es
As we know bulk with same route will send to es only one node and with one
netio one disk io so every
!http://km.oa.com/files/photos/captures/202007/1593922902_79_w1275_h710.png!
Therefore, we take the following method. The beforeBulk in
ElasticsearchSinkBase writes the same bulk according to the same batch. like
this,
private class BulkProcessorListener implements BulkProcessor.Listener {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
if (routePreBulk) {//Need to verify, whether there is a route set upstream
String routing = UUID.randomUUID() + "_" + executionId;
List<ActionRequest> requests = request.requests();
requests.forEach(x -> {
((IndexRequest) x).routing(routing);
});
LOG.info("start bulk actions: {}, routing: {}", request.numberOfActions(),
routing);
}
}
The advantage of this is that when there are many es fragments later, because
every bulk has the same route sent to the same es node, it saves es data
splitting time and data landing time, and improves es performance.
Preliminary estimates, this part can improve the performance of more than 2
times.
The discussion points here are:
Q: can we use keyby and with same route value
A: Since we use this function to improve performance, setting the same route
value after upstream keyby cannot guarantee that all data will be sent in one
batch, such as 1w data and one route value, but there is no guarantee that 1w
data will be in the same batch. .
Q: How to judge whether to add route value
A: Since oceanus cannot provide an external API interface, it is recommended to
sample here, for example, to see if there is a route in a batch, if there are
none, think that this sink does not need a route value.
Q: Is the data uniform
A: we has been running for a long time. In this setting, because bulk is route
value is uniform, es data is uniform
--
This message was sent by Atlassian Jira
(v8.3.4#803005)