[
https://issues.apache.org/jira/browse/BEAM-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662082#comment-16662082
]
Julien Tournay edited comment on BEAM-5439 at 10/24/18 10:36 AM:
-----------------------------------------------------------------
Hi [~iemejia] and [~lcwik].
I just tried the suggested fix and it does seem to improve the performance
quite significantly:
{code:java}
[info] KryoAtomicCoderBenchmark.stringScioStringCoderDecode avgt 20
12023.466 ± 561.226 ns/op
[info] KryoAtomicCoderBenchmark.stringStringCoderReadFullyDecode avgt 20
12212.836 ± 509.935 ns/op
[info] KryoAtomicCoderBenchmark.stringUTF8CoderDecode avgt 20
15253.582 ± 1054.685 ns/op
{code}
* {{stringScioStringCoderDecode}} Is the version of ScioStringCoder (removed
DataInputStream completely)
* {{stringStringCoderReadFullyDecode}} Is the version using
{{ByteStreams.readFully(InputStream, byte[])}}
* {{stringUTF8CoderDecode}} is beam's default coder.
I micro benchmarked it by decoding a string containing "stringvalue1
stringvalue2 stringvalue3..." up until "stringvalue1000".
Here's the "fixed" coder implementation:
{code:scala}
private final object StringCoderReadFully extends AtomicCoder[String] {
import org.apache.beam.sdk.coders.CoderException
import org.apache.beam.sdk.util.VarInt
import java.nio.charset.StandardCharsets
import org.apache.beam.sdk.values.TypeDescriptor
import com.google.common.base.Utf8
def decode(dis: InputStream): String = {
val len = VarInt.decodeInt(dis)
if (len < 0) {
throw new CoderException("Invalid encoded string length: " + len)
}
val bytes = new Array[Byte](len)
com.google.common.io.ByteStreams.readFully(dis, bytes)
return new String(bytes, StandardCharsets.UTF_8)
}
def encode(value: String, outStream: OutputStream): Unit = {
val bytes = value.getBytes(StandardCharsets.UTF_8)
VarInt.encode(bytes.length, outStream)
outStream.write(bytes)
}
override def verifyDeterministic() = ()
override def consistentWithEquals() = true
private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
override def getEncodedElementByteSize(value: String) = {
if (value == null) {
throw new CoderException("cannot encode a null String")
}
val size = Utf8.encodedLength(value)
VarInt.getLength(size) + size
}
}
{code}
I'll submit a PR on beam.
was (Author: jto):
Hi [~iemejia] and [~lcwik].
I just tried the suggested fix and it does seem to improve the performance
quite significantly:
{code:java}
KryoAtomicCoderBenchmark.stringScioStringCoderDecode avgt 20
12023.466 ± 561.226 ns/op
KryoAtomicCoderBenchmark.stringStringCoderReadFullyDecode avgt 20 12212.836
± 509.935 ns/op
KryoAtomicCoderBenchmark.stringUTF8CoderDecode avgt 20
15253.582 ± 1054.685 ns/op
{code}
{{stringScioStringCoderDecode}} Is the version of ScioStringCoder (removed
DataInputStream completely)
{{stringStringCoderReadFullyDecode}} Is the version using
{{ByteStreams.readFully(InputStream, byte[])}}
{{stringUTF8CoderDecode}} is beam's default coder.
I micro benchmarked it by decoding a string containing "stringvalue1
stringvalue2 stringvalue3..." up until "stringvalue1000".
Here's the "fixed" coder implementation:
{code:scala}
private final object StringCoderReadFully extends AtomicCoder[String] {
import org.apache.beam.sdk.coders.CoderException
import org.apache.beam.sdk.util.VarInt
import java.nio.charset.StandardCharsets
import org.apache.beam.sdk.values.TypeDescriptor
import com.google.common.base.Utf8
def decode(dis: InputStream): String = {
val len = VarInt.decodeInt(dis)
if (len < 0) {
throw new CoderException("Invalid encoded string length: " + len)
}
val bytes = new Array[Byte](len)
com.google.common.io.ByteStreams.readFully(dis, bytes)
return new String(bytes, StandardCharsets.UTF_8)
}
def encode(value: String, outStream: OutputStream): Unit = {
val bytes = value.getBytes(StandardCharsets.UTF_8)
VarInt.encode(bytes.length, outStream)
outStream.write(bytes)
}
override def verifyDeterministic() = ()
override def consistentWithEquals() = true
private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
override def getEncodedElementByteSize(value: String) = {
if (value == null) {
throw new CoderException("cannot encode a null String")
}
val size = Utf8.encodedLength(value)
VarInt.getLength(size) + size
}
}
{code}
I'll submit a PR on beam.
> StringUtf8Coder is slower than expected
> ---------------------------------------
>
> Key: BEAM-5439
> URL: https://issues.apache.org/jira/browse/BEAM-5439
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.6.0
> Reporter: Julien Tournay
> Assignee: Julien Tournay
> Priority: Major
> Labels: perfomance
>
> While working on Scio's next version, I noticed that {{StringUtf8Coder}} is
> slower than expected.
> I wrote a small micro-benchmark using {{jmh}} that serialises a (scala) List
> of a 1000 Strings using a custom {{Coder[List[_]]}}. While profiling it, I
> noticed that a lot of time is spent in
> {{java.io.DataInputStream.<init>(java.io.InputStream)}}.
> Looking into the code for
> {{StringUtf8Coder}}, the {{readString}} method is directly reading bytes. It
> therefore does not seem that a {{DataInputStream}} is necessary.
> I replaced {{StringUtf8Coder}} with a {{Coder[String]}} implementation (in
> Scala), that is essentially the same as {{StringUtf8Coder}} but is not using
> {{DataInputStream}}.
>
> {code:scala}
> private final object ScioStringCoder extends AtomicCoder[String] {
> import org.apache.beam.sdk.util.VarInt
> import java.nio.charset.StandardCharsets
> import org.apache.beam.sdk.values.TypeDescriptor
> import com.google.common.base.Utf8
> def decode(dis: InputStream): String = {
> val len = VarInt.decodeInt(dis)
> if (len < 0) {
> throw new CoderException("Invalid encoded string length: " + len)
> }
> val bytes = new Array[Byte](len)
> dis.read(bytes)
> return new String(bytes, StandardCharsets.UTF_8)
> }
> def encode(value: String, outStream: OutputStream): Unit = {
> val bytes = value.getBytes(StandardCharsets.UTF_8)
> VarInt.encode(bytes.length, outStream)
> outStream.write(bytes)
> }
> override def verifyDeterministic() = ()
> override def consistentWithEquals() = true
> private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
> override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
> override def getEncodedElementByteSize(value: String) = {
> if (value == null) {
> throw new CoderException("cannot encode a null String")
> }
> val size = Utf8.encodedLength(value)
> VarInt.getLength(size) + size
> }
> }
> {code}
>
> Using that {{Coder}} is about 27% faster than {{StringUtf8Coder}}. I've added
> the jmh output in "Docs Text"
> Is there any particular reason to use {{DataInputStream}} ?
> Do you think we can remove that to make {{StringUtf8Coder}} more efficient ?
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)