This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2bcfbe2d547c8a76c4660bfcc7873845fa2f262e
Author: Alex Kokachev <[email protected]>
AuthorDate: Fri Nov 15 08:12:08 2019 +1100

    KAFKA-9011: Removed multiple calls to supplier.get() in order to avoid 
multiple transformer instances being created. (#7685)
    
    This is a followup PR for #7520 to address issue of multiple calls to get() 
as it was pointed out by @bbejeck in #7520 (comment)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../streams/scala/FunctionsCompatConversions.scala | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
index 26756e0..e8420f4 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
@@ -126,23 +126,27 @@ private[scala] object FunctionsCompatConversions {
   implicit class TransformerSupplierAsJava[K, V, VO](val supplier: 
TransformerSupplier[K, V, Iterable[VO]])
       extends AnyVal {
     def asJava: TransformerSupplier[K, V, JIterable[VO]] = new 
TransformerSupplier[K, V, JIterable[VO]] {
-      override def get(): Transformer[K, V, JIterable[VO]] =
+      override def get(): Transformer[K, V, JIterable[VO]] = {
+        val innerTransformer = supplier.get()
         new Transformer[K, V, JIterable[VO]] {
-          override def transform(key: K, value: V): JIterable[VO] = 
supplier.get().transform(key, value).asJava
-          override def init(context: ProcessorContext): Unit = 
supplier.get().init(context)
-          override def close(): Unit = supplier.get().close()
+          override def transform(key: K, value: V): JIterable[VO] = 
innerTransformer.transform(key, value).asJava
+          override def init(context: ProcessorContext): Unit = 
innerTransformer.init(context)
+          override def close(): Unit = innerTransformer.close()
         }
+      }
     }
   }
   implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: 
ValueTransformerSupplier[V, Iterable[VO]])
       extends AnyVal {
     def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new 
ValueTransformerSupplier[V, JIterable[VO]] {
-      override def get(): ValueTransformer[V, JIterable[VO]] =
+      override def get(): ValueTransformer[V, JIterable[VO]] = {
+        val innerTransformer = supplier.get()
         new ValueTransformer[V, JIterable[VO]] {
-          override def transform(value: V): JIterable[VO] = 
supplier.get().transform(value).asJava
-          override def init(context: ProcessorContext): Unit = 
supplier.get().init(context)
-          override def close(): Unit = supplier.get().close()
+          override def transform(value: V): JIterable[VO] = 
innerTransformer.transform(value).asJava
+          override def init(context: ProcessorContext): Unit = 
innerTransformer.init(context)
+          override def close(): Unit = innerTransformer.close()
         }
+      }
     }
   }
   implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
@@ -150,12 +154,14 @@ private[scala] object FunctionsCompatConversions {
   ) extends AnyVal {
     def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] =
       new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] {
-        override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] =
+        override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] = {
+          val innerTransformer = supplier.get()
           new ValueTransformerWithKey[K, V, JIterable[VO]] {
-            override def transform(key: K, value: V): JIterable[VO] = 
supplier.get().transform(key, value).asJava
-            override def init(context: ProcessorContext): Unit = 
supplier.get().init(context)
-            override def close(): Unit = supplier.get().close()
+            override def transform(key: K, value: V): JIterable[VO] = 
innerTransformer.transform(key, value).asJava
+            override def init(context: ProcessorContext): Unit = 
innerTransformer.init(context)
+            override def close(): Unit = innerTransformer.close()
           }
+        }
       }
   }
 }

Reply via email to