Repository: parquet-mr Updated Branches: refs/heads/master 9c40a7bb3 -> 7f8e952ab
PARQUET-642: Improve performance of ByteBuffer based read / write paths While trying out the newest Parquet version, we noticed that the changes to start using ByteBuffers: https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 and https://github.com/apache/parquet-mr/commit/6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8 (mostly avro but a couple of ByteBuffer changes) caused our jobs to slow down a bit. Read overhead: 4-6% (in MB_Millis) Write overhead: 6-10% (MB_Millis). Seems like this seems to be due to the encoding / decoding of Strings in the [Binary class](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java): [toStringUsingUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L388) - for reads [encodeUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L236) - for writes With these changes we see around 5% improvement in MB_Millis while running the job on our Hadoop cluster. Added some microbenchmark details to the jira. Note that I've left the behavior the same for the avro write path - it still uses CharSequence and the Charset based encoders. Author: Piyush Narang <[email protected]> Closes #347 from piyushnarang/bytebuffer-encoding-fix-pr and squashes the following commits: 43c5bdd [Piyush Narang] Keep avro on char sequence 2d50c8c [Piyush Narang] Update Binary approach 9e58237 [Piyush Narang] Proof of concept fixes Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/7f8e952a Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/7f8e952a Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/7f8e952a Branch: refs/heads/master Commit: 7f8e952abc4d2fc4b96c97a51aa25fcf6ed8af02 Parents: 9c40a7b Author: Piyush Narang <[email protected]> Authored: Thu Jun 30 09:50:59 2016 -0700 Committer: Julien Le Dem <[email protected]> Committed: Thu Jun 30 09:50:59 2016 -0700 ---------------------------------------------------------------------- .../apache/parquet/avro/AvroWriteSupport.java | 2 +- .../java/org/apache/parquet/io/api/Binary.java | 73 ++++++++++++++------ 2 files changed, 53 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/7f8e952a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 7fcd88e..460565b 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -364,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> { Utf8 utf8 = (Utf8) value; return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength()); } - return Binary.fromString((CharSequence) value); + return Binary.fromCharSequence((CharSequence) value); } private static GenericData getDataModel(Configuration conf) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/7f8e952a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index 30787f0..50b98c2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -31,6 +31,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.ParquetEncodingException; import static org.apache.parquet.bytes.BytesUtils.UTF8; @@ -214,7 +215,28 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } private static class FromStringBinary extends ByteBufferBackedBinary { - public FromStringBinary(CharSequence value) { + public FromStringBinary(String value) { + // reused is false, because we do not hold on to the buffer after + // conversion, and nobody else has a handle to it + super(encodeUTF8(value), false); + } + + @Override + public String toString() { + return "Binary{\"" + toStringUsingUTF8() + "\"}"; + } + + private static ByteBuffer encodeUTF8(String value) { + try { + return ByteBuffer.wrap(value.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new ParquetEncodingException("UTF-8 not supported.", e); + } + } + } + + private static class FromCharSequenceBinary extends ByteBufferBackedBinary { + public FromCharSequenceBinary(CharSequence value) { // reused is false, because we do not hold on to the buffer after // conversion, and nobody else has a handle to it super(encodeUTF8(value), false); @@ -226,12 +248,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } private static final ThreadLocal<CharsetEncoder> ENCODER = - new ThreadLocal<CharsetEncoder>() { - @Override - protected CharsetEncoder initialValue() { - return StandardCharsets.UTF_8.newEncoder(); - } - }; + new ThreadLocal<CharsetEncoder>() { + @Override + protected CharsetEncoder initialValue() { + return StandardCharsets.UTF_8.newEncoder(); + } + }; private static ByteBuffer encodeUTF8(CharSequence value) { try { @@ -386,16 +408,26 @@ abstract public class Binary implements Comparable<Binary>, Serializable { @Override public String toStringUsingUTF8() { - int limit = value.limit(); - value.limit(offset+length); - int position = value.position(); - value.position(offset); - // no corresponding interface to read a subset of a buffer, would have to slice it - // which creates another ByteBuffer object or do what is done here to adjust the - // limit/offset and set them back after - String ret = UTF8.decode(value).toString(); - value.limit(limit); - value.position(position); + String ret; + if (value.hasArray()) { + try { + ret = new String(value.array(), value.arrayOffset() + offset, length, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new ParquetDecodingException("UTF-8 not supported"); + } + } else { + int limit = value.limit(); + value.limit(offset+length); + int position = value.position(); + value.position(offset); + // no corresponding interface to read a subset of a buffer, would have to slice it + // which creates another ByteBuffer object or do what is done here to adjust the + // limit/offset and set them back after + ret = UTF8.decode(value).toString(); + value.limit(limit); + value.position(position); + } + return ret; } @@ -555,12 +587,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } public static Binary fromString(String value) { - // this method is for binary backward-compatibility - return fromString((CharSequence) value); + return new FromStringBinary(value); } - public static Binary fromString(CharSequence value) { - return new FromStringBinary(value); + public static Binary fromCharSequence(CharSequence value) { + return new FromCharSequenceBinary(value); } /**
