[ 
https://issues.apache.org/jira/browse/BEAM-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662082#comment-16662082
 ] 

Julien Tournay edited comment on BEAM-5439 at 10/24/18 10:36 AM:
-----------------------------------------------------------------

Hi [~iemejia] and [~lcwik].

I just tried the suggested fix and it does seem to improve the performance 
quite significantly:
{code:java}
[info] KryoAtomicCoderBenchmark.stringScioStringCoderDecode       avgt   20  
12023.466 ±  561.226  ns/op
[info] KryoAtomicCoderBenchmark.stringStringCoderReadFullyDecode  avgt   20  
12212.836 ±  509.935  ns/op
[info] KryoAtomicCoderBenchmark.stringUTF8CoderDecode             avgt   20  
15253.582 ± 1054.685  ns/op
{code}

* {{stringScioStringCoderDecode}} Is the version of ScioStringCoder (removed 
DataInputStream completely)
* {{stringStringCoderReadFullyDecode}} Is the version using 
{{ByteStreams.readFully(InputStream, byte[])}}
* {{stringUTF8CoderDecode}} is beam's default coder.

I micro benchmarked it by decoding a string containing "stringvalue1 
stringvalue2 stringvalue3..." up until "stringvalue1000".

Here's the "fixed" coder implementation:

{code:scala}
private final object StringCoderReadFully extends AtomicCoder[String] {
  import org.apache.beam.sdk.coders.CoderException
  import org.apache.beam.sdk.util.VarInt
  import java.nio.charset.StandardCharsets
  import org.apache.beam.sdk.values.TypeDescriptor
  import com.google.common.base.Utf8

  def decode(dis: InputStream): String = {
    val len = VarInt.decodeInt(dis)
    if (len < 0) {
      throw new CoderException("Invalid encoded string length: " + len)
    }
    val bytes = new Array[Byte](len)
    com.google.common.io.ByteStreams.readFully(dis, bytes)
    return new String(bytes, StandardCharsets.UTF_8)
  }

  def encode(value: String, outStream: OutputStream): Unit = {
    val bytes = value.getBytes(StandardCharsets.UTF_8)
    VarInt.encode(bytes.length, outStream)
    outStream.write(bytes)
  }

  override def verifyDeterministic() = ()
  override def consistentWithEquals() = true
  private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
  override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
  override def getEncodedElementByteSize(value: String) = {
    if (value == null) {
      throw new CoderException("cannot encode a null String")
    }
    val size = Utf8.encodedLength(value)
    VarInt.getLength(size) + size
  }
}
{code}

I'll submit a PR on beam.



was (Author: jto):
Hi [~iemejia] and [~lcwik].

I just tried the suggested fix and it does seem to improve the performance 
quite significantly:
{code:java}
KryoAtomicCoderBenchmark.stringScioStringCoderDecode           avgt   20  
12023.466 ±  561.226  ns/op 
KryoAtomicCoderBenchmark.stringStringCoderReadFullyDecode  avgt   20  12212.836 
±  509.935  ns/op
KryoAtomicCoderBenchmark.stringUTF8CoderDecode                   avgt   20  
15253.582 ± 1054.685  ns/op
{code}

{{stringScioStringCoderDecode}} Is the version of ScioStringCoder (removed 
DataInputStream completely)
{{stringStringCoderReadFullyDecode}} Is the version using 
{{ByteStreams.readFully(InputStream, byte[])}}
{{stringUTF8CoderDecode}} is beam's default coder.

I micro benchmarked it by decoding a string containing "stringvalue1 
stringvalue2 stringvalue3..." up until "stringvalue1000".

Here's the "fixed" coder implementation:

{code:scala}
private final object StringCoderReadFully extends AtomicCoder[String] {
  import org.apache.beam.sdk.coders.CoderException
  import org.apache.beam.sdk.util.VarInt
  import java.nio.charset.StandardCharsets
  import org.apache.beam.sdk.values.TypeDescriptor
  import com.google.common.base.Utf8

  def decode(dis: InputStream): String = {
    val len = VarInt.decodeInt(dis)
    if (len < 0) {
      throw new CoderException("Invalid encoded string length: " + len)
    }
    val bytes = new Array[Byte](len)
    com.google.common.io.ByteStreams.readFully(dis, bytes)
    return new String(bytes, StandardCharsets.UTF_8)
  }

  def encode(value: String, outStream: OutputStream): Unit = {
    val bytes = value.getBytes(StandardCharsets.UTF_8)
    VarInt.encode(bytes.length, outStream)
    outStream.write(bytes)
  }

  override def verifyDeterministic() = ()
  override def consistentWithEquals() = true
  private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
  override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
  override def getEncodedElementByteSize(value: String) = {
    if (value == null) {
      throw new CoderException("cannot encode a null String")
    }
    val size = Utf8.encodedLength(value)
    VarInt.getLength(size) + size
  }
}
{code}

I'll submit a PR on beam.


> StringUtf8Coder is slower than expected
> ---------------------------------------
>
>                 Key: BEAM-5439
>                 URL: https://issues.apache.org/jira/browse/BEAM-5439
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.6.0
>            Reporter: Julien Tournay
>            Assignee: Julien Tournay
>            Priority: Major
>              Labels: perfomance
>
> While working on Scio's next version, I noticed that {{StringUtf8Coder}} is 
> slower than expected.
> I wrote a small micro-benchmark using {{jmh}} that serialises a (scala) List 
> of a 1000 Strings using a custom {{Coder[List[_]]}}. While profiling it, I 
> noticed that a lot of time is spent in 
> {{java.io.DataInputStream.<init>(java.io.InputStream)}}.
> Looking into the code for 
> {{StringUtf8Coder}}, the {{readString}} method is directly reading bytes. It 
> therefore does not seem that a {{DataInputStream}} is necessary.
> I replaced {{StringUtf8Coder}} with a {{Coder[String]}} implementation (in 
> Scala), that is essentially the same as {{StringUtf8Coder}} but is not using 
> {{DataInputStream}}.
>  
> {code:scala}
> private final object ScioStringCoder extends AtomicCoder[String] {
>   import org.apache.beam.sdk.util.VarInt
>   import java.nio.charset.StandardCharsets
>   import org.apache.beam.sdk.values.TypeDescriptor
>   import com.google.common.base.Utf8
>   def decode(dis: InputStream): String = {
>     val len = VarInt.decodeInt(dis)
>     if (len < 0) {
>       throw new CoderException("Invalid encoded string length: " + len)
>     }
>     val bytes = new Array[Byte](len)
>     dis.read(bytes)
>     return new String(bytes, StandardCharsets.UTF_8)
>   }
>   def encode(value: String, outStream: OutputStream): Unit = {
>     val bytes = value.getBytes(StandardCharsets.UTF_8)
>     VarInt.encode(bytes.length, outStream)
>     outStream.write(bytes)
>   }
>   override def verifyDeterministic() = ()
>   override def consistentWithEquals() = true
>   private val TYPE_DESCRIPTOR = new TypeDescriptor[String] {}
>   override def getEncodedTypeDescriptor() = TYPE_DESCRIPTOR
>   override def getEncodedElementByteSize(value: String) = {
>     if (value == null) {
>       throw new CoderException("cannot encode a null String")
>     }
>     val size = Utf8.encodedLength(value)
>     VarInt.getLength(size) + size
>   }
> }
> {code}
>  
> Using that {{Coder}} is about 27% faster than {{StringUtf8Coder}}. I've added 
> the jmh output in "Docs Text"
> Is there any particular reason to use {{DataInputStream}} ? 
> Do you think we can remove that to make {{StringUtf8Coder}} more efficient ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to