shuttie commented on a change in pull request #10358: [FLINK-14346] 
[serialization] faster implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#discussion_r354734758
 
 

 ##########
 File path: flink-core/src/main/java/org/apache/flink/types/StringValue.java
 ##########
 @@ -759,57 +765,151 @@ public static String readString(DataInput in) throws 
IOException {
                        }
                        len |= curr << shift;
                }
-               
+
                // subtract one for the null length
                len -= 1;
-               
-               final char[] data = new char[len];
 
-               for (int i = 0; i < len; i++) {
-                       int c = in.readUnsignedByte();
-                       if (c < HIGH_BIT) {
-                               data[i] = (char) c;
-                       } else {
+               /* as we have no idea about byte-length of the serialized 
string, we cannot fully
+                * read it into memory buffer. But we can do it in an 
optimistic way:
+                * 1. In a happy case when the string is an us-ascii one, then 
byte_len == char_len
+                * 2. If we spot at least one character with code >= 127, then 
we reallocate the buffer
+                * to accommodate for the next characters.
+                */
+
+               // happily assume that the string is an 7 bit us-ascii one
+               byte[] buf = new byte[len];
+               in.readFully(buf);
+
+               final char[] data = new char[len];
+               int charPosition = 0;
+               int bufSize = len;
+               int bytePosition = 0;
+
+               while (charPosition < len) {
+                       if (bytePosition == bufSize) {
+                               // there is at least `char count - char 
position` bytes left in case if all the
+                               // remaining characters are 7 bit.
+                               int minRemainingChars = len - charPosition;
+                               // need to refill the buffer as we already 
reached its end.
+                               // we also reuse the old buffer, as it's 
capacity must always be >= minRemainingChars.
+                               in.readFully(buf, 0, minRemainingChars);
+                               bytePosition = 0;
+                               bufSize = minRemainingChars;
+                       }
+                       int c = buf[bytePosition++] & 255;
+                       // non 7-bit path
+                       if (c >= HIGH_BIT) {
                                int shift = 7;
                                int curr;
                                c = c & 0x7f;
-                               while ((curr = in.readUnsignedByte()) >= 
HIGH_BIT) {
+                               if (bytePosition == bufSize) {
+                                       int minRemainingChars = len - 
charPosition;
+                                       in.readFully(buf, 0, minRemainingChars);
+                                       bytePosition = 0;
+                                       bufSize = minRemainingChars;
+                               }
+
+                               while ((curr = buf[bytePosition++] & 255) >= 
HIGH_BIT) {
                                        c |= (curr & 0x7f) << shift;
                                        shift += 7;
+                                       if (bytePosition == bufSize) {
+                                               int minRemainingChars = len - 
charPosition;
+                                               // may need to refill the 
buffer if char bytes are split between the buffers.
+                                               in.readFully(buf, 0, 
minRemainingChars);
+                                               bytePosition = 0;
+                                               bufSize = minRemainingChars;
+                                       }
                                }
                                c |= curr << shift;
-                               data[i] = (char) c;
                        }
+                       data[charPosition++] = (char) c;
                }
-               
                return new String(data, 0, len);
        }
 
        public static final void writeString(CharSequence cs, DataOutput out) 
throws IOException {
                if (cs != null) {
+                       int strlen = cs.length();
+
                        // the length we write is offset by one, because a 
length of zero indicates a null value
-                       int lenToWrite = cs.length()+1;
+                       int lenToWrite = strlen + 1;
                        if (lenToWrite < 0) {
                                throw new 
IllegalArgumentException("CharSequence is too long.");
                        }
-       
-                       // write the length, variable-length encoded
-                       while (lenToWrite >= HIGH_BIT) {
-                               out.write(lenToWrite | HIGH_BIT);
-                               lenToWrite >>>= 7;
+
+
+                       // buffer size needed to contain all the characters
+                       int buflen = 0;
+                       if (strlen < 8) {
+                               // for small strings it's much faster to 
over-allocate a buffer sized for the worst-case scenario
+                               // when all the chars are encoded as 3-byte 
sequences, than iterating over the source string.
+                               buflen = strlen * 3;
+                       } else {
+                               for (int i = 0; i < strlen; i++) {
+                                       char c = cs.charAt(i);
+                                       if ((c & 0xff80) == 0) {
+                                               buflen++;
+                                       } else if (c > 0x07FF) {
+                                               buflen += 3;
+                                       } else {
+                                               buflen += 2;
+                                       }
+                               }
                        }
-                       out.write(lenToWrite);
-       
+                       byte[] buffer;
+                       int position = 0;
+                       // string is prefixed by it's variable length encoded 
size, which can take 1-5 bytes.
+                       if (lenToWrite < HIGH_BIT) {
+                               buflen += 1;
+                               buffer = new byte[buflen];
+                               buffer[position++] = (byte) lenToWrite;
+                       } else if (lenToWrite < HIGH_BIT14) {
+                               buflen += 2;
+                               buffer = new byte[buflen];
+                               buffer[position++] = (byte)(lenToWrite | 
HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 7));
+                       } else if (lenToWrite < HIGH_BIT21) {
+                               buflen += 3;
+                               buffer = new byte[buflen];
+                               buffer[position++] = (byte)(lenToWrite | 
HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 7) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 
14));
+                       } else if (lenToWrite < HIGH_BIT28) {
+                               buflen += 4;
+                               buffer = new byte[buflen];
+                               buffer[position++] = (byte)(lenToWrite | 
HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 7) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 14) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 
21));
+                       } else {
+                               buflen += 5;
+                               buffer = new byte[buflen];
+                               buffer[position++] = (byte)(lenToWrite | 
HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 7) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 14) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 21) 
| HIGH_BIT);
+                               buffer[position++] = (byte)((lenToWrite >>> 
28));
+                       }
+
+
                        // write the char data, variable length encoded
-                       for (int i = 0; i < cs.length(); i++) {
+                       for (int i = 0; i < strlen; i++) {
                                int c = cs.charAt(i);
-       
-                               while (c >= HIGH_BIT) {
-                                       out.write(c | HIGH_BIT);
-                                       c >>>= 7;
+
+                               // manual loop unroll, as it performs much 
better on jdk8
+                               if (c < HIGH_BIT) {
+                                       buffer[position++] = (byte)c;
+                               } else if (c < HIGH_BIT14) {
+                                       buffer[position++] = (byte)(c | 
HIGH_BIT);
+                                       buffer[position++] = (byte)((c >>> 7));
+                               } else {
 
 Review comment:
   Java characters (e.g. char and Character type) have internal UTF16 encoding, 
meaning that they will always be encoded as 16-bit numbers. But there are some 
symbols called surrogate pairs, which may require a pair of 16-bit characters 
to be encoded.
   
   So from the wire format point of view, when you do `"🌉".length()` you will 
get 2 (as there are two underlying 16-bit characters to encode this symbol), 
but with `"🌉".codePointCount(0,"🌉".length())` you will get 1 (as there only one 
single symbol). But the string serializer is not operating on symbols and UTF 
code points, it is operating on the underlying 16-bit numbers, so it must 
handle it properly out of the box.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to