shuttie commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#discussion_r354734758
########## File path: flink-core/src/main/java/org/apache/flink/types/StringValue.java ########## @@ -759,57 +765,151 @@ public static String readString(DataInput in) throws IOException { } len |= curr << shift; } - + // subtract one for the null length len -= 1; - - final char[] data = new char[len]; - for (int i = 0; i < len; i++) { - int c = in.readUnsignedByte(); - if (c < HIGH_BIT) { - data[i] = (char) c; - } else { + /* as we have no idea about byte-length of the serialized string, we cannot fully + * read it into memory buffer. But we can do it in an optimistic way: + * 1. In a happy case when the string is an us-ascii one, then byte_len == char_len + * 2. If we spot at least one character with code >= 127, then we reallocate the buffer + * to accommodate for the next characters. + */ + + // happily assume that the string is an 7 bit us-ascii one + byte[] buf = new byte[len]; + in.readFully(buf); + + final char[] data = new char[len]; + int charPosition = 0; + int bufSize = len; + int bytePosition = 0; + + while (charPosition < len) { + if (bytePosition == bufSize) { + // there is at least `char count - char position` bytes left in case if all the + // remaining characters are 7 bit. + int minRemainingChars = len - charPosition; + // need to refill the buffer as we already reached its end. + // we also reuse the old buffer, as it's capacity must always be >= minRemainingChars. + in.readFully(buf, 0, minRemainingChars); + bytePosition = 0; + bufSize = minRemainingChars; + } + int c = buf[bytePosition++] & 255; + // non 7-bit path + if (c >= HIGH_BIT) { int shift = 7; int curr; c = c & 0x7f; - while ((curr = in.readUnsignedByte()) >= HIGH_BIT) { + if (bytePosition == bufSize) { + int minRemainingChars = len - charPosition; + in.readFully(buf, 0, minRemainingChars); + bytePosition = 0; + bufSize = minRemainingChars; + } + + while ((curr = buf[bytePosition++] & 255) >= HIGH_BIT) { c |= (curr & 0x7f) << shift; shift += 7; + if (bytePosition == bufSize) { + int minRemainingChars = len - charPosition; + // may need to refill the buffer if char bytes are split between the buffers. + in.readFully(buf, 0, minRemainingChars); + bytePosition = 0; + bufSize = minRemainingChars; + } } c |= curr << shift; - data[i] = (char) c; } + data[charPosition++] = (char) c; } - return new String(data, 0, len); } public static final void writeString(CharSequence cs, DataOutput out) throws IOException { if (cs != null) { + int strlen = cs.length(); + // the length we write is offset by one, because a length of zero indicates a null value - int lenToWrite = cs.length()+1; + int lenToWrite = strlen + 1; if (lenToWrite < 0) { throw new IllegalArgumentException("CharSequence is too long."); } - - // write the length, variable-length encoded - while (lenToWrite >= HIGH_BIT) { - out.write(lenToWrite | HIGH_BIT); - lenToWrite >>>= 7; + + + // buffer size needed to contain all the characters + int buflen = 0; + if (strlen < 8) { + // for small strings it's much faster to over-allocate a buffer sized for the worst-case scenario + // when all the chars are encoded as 3-byte sequences, than iterating over the source string. + buflen = strlen * 3; + } else { + for (int i = 0; i < strlen; i++) { + char c = cs.charAt(i); + if ((c & 0xff80) == 0) { + buflen++; + } else if (c > 0x07FF) { + buflen += 3; + } else { + buflen += 2; + } + } } - out.write(lenToWrite); - + byte[] buffer; + int position = 0; + // string is prefixed by it's variable length encoded size, which can take 1-5 bytes. + if (lenToWrite < HIGH_BIT) { + buflen += 1; + buffer = new byte[buflen]; + buffer[position++] = (byte) lenToWrite; + } else if (lenToWrite < HIGH_BIT14) { + buflen += 2; + buffer = new byte[buflen]; + buffer[position++] = (byte)(lenToWrite | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 7)); + } else if (lenToWrite < HIGH_BIT21) { + buflen += 3; + buffer = new byte[buflen]; + buffer[position++] = (byte)(lenToWrite | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 14)); + } else if (lenToWrite < HIGH_BIT28) { + buflen += 4; + buffer = new byte[buflen]; + buffer[position++] = (byte)(lenToWrite | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 14) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 21)); + } else { + buflen += 5; + buffer = new byte[buflen]; + buffer[position++] = (byte)(lenToWrite | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 14) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 21) | HIGH_BIT); + buffer[position++] = (byte)((lenToWrite >>> 28)); + } + + // write the char data, variable length encoded - for (int i = 0; i < cs.length(); i++) { + for (int i = 0; i < strlen; i++) { int c = cs.charAt(i); - - while (c >= HIGH_BIT) { - out.write(c | HIGH_BIT); - c >>>= 7; + + // manual loop unroll, as it performs much better on jdk8 + if (c < HIGH_BIT) { + buffer[position++] = (byte)c; + } else if (c < HIGH_BIT14) { + buffer[position++] = (byte)(c | HIGH_BIT); + buffer[position++] = (byte)((c >>> 7)); + } else { Review comment: Java characters (e.g. char and Character type) have internal UTF16 encoding, meaning that they will always be encoded as 16-bit numbers. But there are some symbols called surrogate pairs, which may require a pair of 16-bit characters to be encoded. So from the wire format point of view, when you do `"🌉".length()` you will get 2 (as there are two underlying 16-bit characters to encode this symbol), but with `"🌉".codePointCount(0,"🌉".length())` you will get 1 (as there only one single symbol). But the string serializer is not operating on symbols and UTF code points, it is operating on the underlying 16-bit numbers, so it must handle it properly out of the box. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services