PARQUET-642: Improve performance of ByteBuffer based read / write paths While trying out the newest Parquet version, we noticed that the changes to start using ByteBuffers: https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 and https://github.com/apache/parquet-mr/commit/6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8 (mostly avro but a couple of ByteBuffer changes) caused our jobs to slow down a bit.
Read overhead: 4-6% (in MB_Millis) Write overhead: 6-10% (MB_Millis). Seems like this seems to be due to the encoding / decoding of Strings in the [Binary class](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java): [toStringUsingUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L388) - for reads [encodeUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L236) - for writes With these changes we see around 5% improvement in MB_Millis while running the job on our Hadoop cluster. Added some microbenchmark details to the jira. Note that I've left the behavior the same for the avro write path - it still uses CharSequence and the Charset based encoders. Author: Piyush Narang <[email protected]> Closes #347 from piyushnarang/bytebuffer-encoding-fix-pr and squashes the following commits: 43c5bdd [Piyush Narang] Keep avro on char sequence 2d50c8c [Piyush Narang] Update Binary approach 9e58237 [Piyush Narang] Proof of concept fixes Conflicts: parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java Resolution: Use String encoding/decoding where possible. Updated Avro to use fromCharSequence to avoid two copies Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/1535970c Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/1535970c Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/1535970c Branch: refs/heads/parquet-1.8.x Commit: 1535970c5072633a69413cdc070208d62c6e0431 Parents: 35cf1b4 Author: Piyush Narang <[email protected]> Authored: Thu Jun 30 09:50:59 2016 -0700 Committer: Ryan Blue <[email protected]> Committed: Mon Jan 9 16:54:54 2017 -0800 ---------------------------------------------------------------------- .../apache/parquet/avro/AvroWriteSupport.java | 2 +- .../java/org/apache/parquet/io/api/Binary.java | 63 +++++++++++++++++--- 2 files changed, 56 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1535970c/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index c75bb03..29dc9a1 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -283,7 +283,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { Utf8 utf8 = (Utf8) value; return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength()); } - return Binary.fromString(value.toString()); + return Binary.fromCharSequence((CharSequence) value); } private static GenericData getDataModel(Configuration conf) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1535970c/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index f88d740..a2f686d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -25,8 +25,12 @@ import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetEncoder; import java.util.Arrays; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.ParquetEncodingException; import static org.apache.parquet.bytes.BytesUtils.UTF8; @@ -195,21 +199,49 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } - private static class FromStringBinary extends ByteArrayBackedBinary { + private static class FromStringBinary extends ByteBufferBackedBinary { public FromStringBinary(String value) { - // reused is false, because we do not - // hold on to the underlying bytes, - // and nobody else has a handle to them + // reused is false, because we do not hold on to the buffer after + // conversion, and nobody else has a handle to it super(encodeUTF8(value), false); } - private static byte[] encodeUTF8(String value) { + @Override + public String toString() { + return "Binary{\"" + toStringUsingUTF8() + "\"}"; + } + + private static ByteBuffer encodeUTF8(String value) { try { - return value.getBytes("UTF-8"); + return ByteBuffer.wrap(value.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { throw new ParquetEncodingException("UTF-8 not supported.", e); } } + } + + private static class FromCharSequenceBinary extends ByteBufferBackedBinary { + public FromCharSequenceBinary(CharSequence value) { + // reused is false, because we do not hold on to the buffer after + // conversion, and nobody else has a handle to it + super(encodeUTF8(value), false); + } + + private static final ThreadLocal<CharsetEncoder> ENCODER = + new ThreadLocal<CharsetEncoder>() { + @Override + protected CharsetEncoder initialValue() { + return UTF8.newEncoder(); + } + }; + + private static ByteBuffer encodeUTF8(CharSequence value) { + try { + return ENCODER.get().encode(CharBuffer.wrap(value)); + } catch (CharacterCodingException e) { + throw new ParquetEncodingException("UTF-8 not supported.", e); + } + } @Override public String toString() { @@ -340,7 +372,18 @@ abstract public class Binary implements Comparable<Binary>, Serializable { @Override public String toStringUsingUTF8() { - return UTF8.decode(value).toString(); + String ret; + if (value.hasArray()) { + try { + ret = new String(value.array(), value.arrayOffset() + value.position(), value.remaining(), "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new ParquetDecodingException("UTF-8 not supported"); + } + } else { + ret = UTF8.decode(value.duplicate()).toString(); + } + + return ret; } @Override @@ -472,10 +515,14 @@ abstract public class Binary implements Comparable<Binary>, Serializable { return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[] } - public static Binary fromString(final String value) { + public static Binary fromString(String value) { return new FromStringBinary(value); } + public static Binary fromCharSequence(CharSequence value) { + return new FromCharSequenceBinary(value); + } + /** * @see {@link Arrays#hashCode(byte[])} * @param array
