shuttie opened a new pull request #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358
 
 
   ## What is the purpose of the change
   
   This PR implements a set of performance optimizations for String 
serialization and de-serialization. While running a set of state-heavy 
streaming jobs, we noticed that Flink spends quite a lot of CPU time (~30-40%) 
doing String encoding and decoding in two places: while transferring messages 
between the nodes, and while loading and writing objects into the state store.
   
   We did a simple benchmark of String read/write operations compared to a 
default JDK's DataOutput.writeUTF ano noted a significant performance 
difference between Flink implementation and the JDK one.
   
   Performance difference was 4x on decoding and 2x on encoding for 16 symbol 
ascii strings.
   ```
   [info]       Benchmark                             (length)  (stringType)    
Mode    Cnt     Score       Error       Units
   [info]       StringDeserializerBenchmark.deserializeDefault  16      ascii   
avgt    25      251.321 ±       3.251   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      16      ascii   
avgt    25      77.147  ±       1.661   ns/op
   
   [info]       StringSerializerBenchmark.serializeDefault      16      ascii   
avgt    25      95.782  ±       0.261   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          16      ascii   
avgt    25      50.786  ±       1.677   ns/op
   ```
   For larger strings performance was degrading even more significant, 7x and 
4x accordingly:
   ```
   [info]       Benchmark                             (length)  (stringType)    
Mode    Cnt     Score       Error       Units
   [info]       StringDeserializerBenchmark.deserializeDefault  128     ascii   
avgt    25      1757.726        ±       3.312   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      128     ascii   
avgt    25      263.445 ±       6.912   ns/op
   
   [info]       StringSerializerBenchmark.serializeDefault      128     ascii   
avgt    25      670.627 ±       2.807   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          128     ascii   
avgt    25      151.789 ±       7.295   ns/op
   ```
   
   But JDK DataOutput.writeUTF cannot be directly used as a serializer 
replacement because of it being incompatible with the current Flink binary 
format for strings. Also it is not able to write strings longer than 32kb, as 
it uses 2-byte length encoding. 
   
   But if you compare the difference in implementation between these two 
algorithms, the main important difference is intermediate buffering in JDK, 
compared to a iterative approach in Flink. This buffering allows HotSpot to do 
two important optimizations: 
   * be able to unroll the whole encoding/decoding loop
   * as there is no data dependencies between characters anymore, CPU can 
achieve much higher internal parallelism and spend less time being stalled.
   
   We made a simple [POC](https://github.com/shuttie/flink-string-serializer) 
which alters the implementation of `StringValue.writeString` and 
`StringValue.readString` in a way to introduce additional buffering, which 
significantly improves both encoding and decoding throughput on almost all the 
workloads.
   
   There is also a property-based validation test suite which ensures that old 
and new serialization code produce exactly the same byte sequences, and doing a 
round-trip from old to new, new to old, and new to new produce the same 
results. We ran this suite with random data for ~1h and found no differences.
   
   ## Benchmark results
   Full raw benchmark results are located 
[here](https://github.com/shuttie/flink-string-serializer/blob/master/README.txt).
 We did a series of benchmarks of three string encoding/decoding impmentations:
   * Flink's current `StringValue.writeString` and `StringValue.readString`
   * `StringValue.writeString` and `StringValue.readString` implementations 
from this PR
   * baseline binary-incompatible implementation of `DataInput.readUTF` and 
`DataOutput.writeUTF`
   
   For a workload generator we used these parameters:
   * string types: 7-bit us-ascii, russian symbols (which are usually encoded 
as a 14-bit varlen numbers), chinese symbols (which are usually within 21-bit 
varlen number range) 
   * string lengths: 1, 4, 8, 16, 32, 64, 128 characters.
   
   ### Ascii strings
   ```
   [info]       Benchmark                             (length)  (stringType)    
Mode    Cnt     Score       Error       Units
   [info]       StringDeserializerBenchmark.deserializeDefault  1       ascii   
avgt    25      46.603  ±       0.750   ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 1       ascii   
avgt    25      51.074  ±       0.720   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      1       ascii   
avgt    25      63.402  ±       1.631   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      1       ascii   
avgt    25      31.595  ±       0.489   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     1       ascii   
avgt    25      33.454  ±       0.151   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          1       ascii   
avgt    25      34.721  ±       0.128   ns/op
   
   [info]       StringDeserializerBenchmark.deserializeDefault  16      ascii   
avgt    25      251.321 ±       3.251   ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 16      ascii   
avgt    25      55.385  ±       1.176   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      16      ascii   
avgt    25      77.147  ±       1.661   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      16      ascii   
avgt    25      95.782  ±       0.261   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     16      ascii   
avgt    25      51.806  ±       0.180   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          16      ascii   
avgt    25      50.786  ±       1.677   ns/op
   
   [info]       StringDeserializerBenchmark.deserializeDefault  128     ascii   
avgt    25      1757.726 ±      3.312   ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 128     ascii   
avgt    25      140.374 ±       1.006   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      128     ascii   
avgt    25      263.445 ±       6.912   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      128     ascii   
avgt    25      670.627 ±       2.807   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     128     ascii   
avgt    25      161.481 ±       2.798   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          128     ascii   
avgt    25      151.789 ±       7.295   ns/op
   ```
   So for ascii strings:
   * on 1-char strings the new implementation is a bit slower than the old one.
   * on 16-char strings encoding is **2x** faster, decoding is **5x** faster
   * on 128-char strings encoding is **4x** faster, decoding is **12x** faster
   
   ### Non-ascii strings
   
   ```
   [info]       Benchmark                             (length)  (stringType)    
Mode    Cnt     Score       Error       Units
   [info]       StringDeserializerBenchmark.deserializeDefault  1       chinese 
avgt    25      77.743  ±       1.635   ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 1       chinese 
avgt    25      78.814  ±       1.329   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      1       chinese 
avgt    25      66.005  ±       1.325   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      1       chinese 
avgt    25      36.767  ±       3.662   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     1       chinese 
avgt    25      36.382  ±       0.153   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          1       chinese 
avgt    25      36.845  ±       0.575   ns/op
   
   [info]       StringDeserializerBenchmark.deserializeDefault  16      chinese 
avgt    25      669.156 ±       3.021   ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 16      chinese 
avgt    25      182.587 ±       4.843   ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      16      chinese 
avgt    25      148.063 ±       2.204   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      16      chinese 
avgt    25      244.844 ±       1.079   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     16      chinese 
avgt    25      86.651  ±       1.316   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          16      chinese 
avgt    25      81.840  ±       1.976   ns/op
   
   [info]       StringDeserializerBenchmark.deserializeDefault  128     chinese 
avgt    25      5147.632 ±      30.068  ns/op
   [info]       StringDeserializerBenchmark.deserializeImproved 128     chinese 
avgt    25      714.912 ±       26.240  ns/op
   [info]       StringDeserializerBenchmark.deserializeJDK      128     chinese 
avgt    25      738.740 ±       7.291   ns/op
   [info]       StringSerializerBenchmark.serializeDefault      128     chinese 
avgt    25      1889.786 ±      8.541   ns/op
   [info]       StringSerializerBenchmark.serializeImproved     128     chinese 
avgt    25      388.404 ±       2.511   ns/op
   [info]       StringSerializerBenchmark.serializeJDK          128     chinese 
avgt    25      401.011 ±       3.157   ns/op
   ```
   So for non-ascii chinese character strings:
   * on 1-char strings there is no performance difference.
   * on 16-char strings encoding is **3x** faster, decoding is **4x** faster
   * on 128-char strings encoding is **5x** faster, decoding is **7x** faster
   
   ## Brief change log
   
   - Replace an existing `StringValue.writeString` and `StringValue.readString` 
methods with improved ones.
   - Add an additional test case to StringSerializationTest to cover utf8 
encoding/decoding.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
StringSerializationTest.
   Also this change added additional test cases and can be verified as follows:
   
   - Add an additional test case to StringSerializationTest to cover utf8 
encoding/decoding.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (**yes** / no / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to