Yingjie Cao created FLINK-11859:
-----------------------------------
Summary: Improve SpanningRecordSerializer performance by
serializing record length to serialization buffer directly
Key: FLINK-11859
URL: https://issues.apache.org/jira/browse/FLINK-11859
Project: Flink
Issue Type: Improvement
Reporter: Yingjie Cao
Assignee: Yingjie Cao
In the current implementation of SpanningRecordSerializer, the length of a
record is serialized to an intermediate length buffer and then copied to the
target buffer. Actually, the length filed can be serialized directly to the
data buffer (serializationBuffer), which can avoid the copy of length buffer.
Though the total bytes copied remain unchanged, it one copy of a small record
which incurs high overhead. The flink-benchmarks shows it can improve
performance and the test results are as follows.
Result with the optimization:
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param:
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms|
|MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms|
|FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms|
|FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms|
|ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms|
|ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms|
| |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms|
| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL|
|
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op|
| |
Result without the optimization:
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param:
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms|
|MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms|
|FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms|
|FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms|
|ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms|
|ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms|
| |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms|
| |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms|
| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms|
|
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL|
|
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op|
| |
The optimization is especially useful for small records.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)