This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dac615  KAFKA-7386: streams-scala should not cache serdes (#5622)
9dac615 is described below

commit 9dac615d228c5b3464c6322aea9f9ce70f9ef37b
Author: John Roesler <[email protected]>
AuthorDate: Tue Sep 11 18:17:47 2018 -0500

    KAFKA-7386: streams-scala should not cache serdes (#5622)
    
    Currently, scala.Serdes.String, for example, invokes Serdes.String() once 
and caches the result.
    
    However, the implementation of the String serde has a non-empty configure 
method that is variant in whether it's used as a key or value serde. So we 
won't get correct execution if we create one serde and use it for both keys and 
values.
    
    Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../org/apache/kafka/streams/scala/Serdes.scala    | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index 8bfb083..02e5380 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -25,17 +25,17 @@ import org.apache.kafka.common.serialization.{Deserializer, 
Serde, Serializer, S
 import org.apache.kafka.streams.kstream.WindowedSerdes
 
 object Serdes {
-  implicit val String: Serde[String] = JSerdes.String()
-  implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
-  implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
-  implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
-  implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
-  implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
-  implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
-  implicit val Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
-  implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
-  implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
-  implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def String: Serde[String] = JSerdes.String()
+  implicit def Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
   implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new 
WindowedSerdes.TimeWindowedSerde[T]()
   implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] 
=

Reply via email to