This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96cda0e  MINOR: Fix type inference on joins and aggregates (#5019)
96cda0e is described below

commit 96cda0e07ac4981a642c6b32fa543bcce78be769
Author: Joan Goyeau <[email protected]>
AuthorDate: Mon May 21 00:25:16 2018 +0100

    MINOR: Fix type inference on joins and aggregates (#5019)
    
    The type inference doesn't currently work for the join functions in Scala 
as it doesn't know yet the types of the given KStream[K, V] or KTable[K, V].
    
    The fix here is to curry the joiner function. I personally prefer this 
notation but this also means it differs more from the Java API.
    I believe the diff with the Java API is worth in this case as it's not only 
solving the type inference but also fits better the Scala way of coding (ex: 
fold).
    
    Moreover any Scala dev will bug and spend little time on these functions 
trying to understand why the type inference is not working and then get 
frustrated to be obliged to be explicit here where it's not harmful to be 
inferred.
    
    Reviewers: Debasish Ghosh <[email protected]>, Guozhang Wang 
<[email protected]>, Ismael Juma <[email protected]>
---
 .../streams/scala/kstream/KGroupedStream.scala     | 15 ++--
 .../streams/scala/kstream/KGroupedTable.scala      | 15 ++--
 .../kafka/streams/scala/kstream/KStream.scala      | 94 ++++++++++------------
 .../kafka/streams/scala/kstream/KTable.scala       | 36 ++++-----
 .../scala/kstream/SessionWindowedKStream.scala     | 14 ++--
 .../scala/kstream/TimeWindowedKStream.scala        | 12 +--
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  4 +-
 .../apache/kafka/streams/scala/TopologyTest.scala  |  2 +-
 8 files changed, 87 insertions(+), 105 deletions(-)

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index acffb1f..2e85bce 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -84,8 +84,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
    */ 
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+  def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, 
ByteArrayKeyValueStore]): KTable[K, V] = {
 
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion 
doesn't take place
     // works perfectly with Scala 2.12 though
@@ -101,9 +100,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, 
V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): 
KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -115,10 +113,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, 
V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, 
materialized)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, 
ByteArrayKeyValueStore]): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, materialized)
 
   /**
    * Create a new [[SessionWindowedKStream]] instance that can be used to 
perform session windowed aggregations.
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 673ab5d..87a11c5 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -108,10 +108,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, 
subtractor.asAggregator)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: 
(K, V, VR) => VR): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, 
subtractor.asAggregator)
 
   /**
    * Aggregate the value of records of the original [[KTable]] that got 
[[KTable#groupBy]]
@@ -125,9 +123,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR,
-                    materialized: Materialized[K, VR, 
ByteArrayKeyValueStore]): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, 
subtractor.asAggregator, materialized)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR,
+                                        subtractor: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, 
ByteArrayKeyValueStore]): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, 
subtractor.asAggregator, materialized)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 4b0dc2b..49d9fe4 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -47,9 +47,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that satisfy the 
given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filter`
    */ 
-  def filter(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filter(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filter(predicate.asPredicate)
-  }
 
   /**
    * Create a new [[KStream]] that consists all records of this stream which 
do <em>not</em> satisfy the given
@@ -59,9 +58,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that do 
<em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filterNot`
    */ 
-  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filterNot(predicate.asPredicate)
-  }
 
   /**
    * Set a new key (with possibly new type) for each input record.
@@ -73,9 +71,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with new key (possibly of 
different type) and unmodified value
    * @see `org.apache.kafka.streams.kstream.KStream#selectKey`
    */ 
-  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
     inner.selectKey[KR](mapper.asKeyValueMapper)
-  }
 
   /**
    * Transform each record of the input stream into a new record in the output 
stream (both key and value type can be
@@ -101,9 +98,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: V => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: V => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
@@ -114,9 +110,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
@@ -145,9 +140,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified 
keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Create a new [[KStream]] by transforming the value of each record in this 
stream into zero or more values
@@ -161,9 +155,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified 
keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Print the records of this KStream using the options provided by `Printed`
@@ -179,9 +172,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
    */
-  def foreach(action: (K, V) => Unit): Unit = {
+  def foreach(action: (K, V) => Unit): Unit =
     inner.foreach((k: K, v: V) => action(k, v))
-  }
 
   /**
    * Creates an array of {@code KStream} from this stream by branching the 
records in the original stream based on
@@ -191,9 +183,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return multiple distinct substreams of this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#branch`
    */
-  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = {
+  def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => 
wrapKStream(kstream))
-  }
 
   /**
    * Materialize this stream to a topic and creates a new [[KStream]] from the 
topic using the `Produced` instance for 
@@ -304,9 +295,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */ 
   def transformValues[VR](valueTransformerSupplier: 
ValueTransformerSupplier[V, VR],
-                          stateStoreNames: String*): KStream[K, VR] = {
+                          stateStoreNames: String*): KStream[K, VR] =
     inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
@@ -335,9 +325,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param stateStoreNames   the names of the state store used by the 
processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */ 
-  def process(processorSupplier: () => Processor[K, V],
-    stateStoreNames: String*): Unit = {
-
+  def process(processorSupplier: () => Processor[K, V], stateStoreNames: 
String*): Unit = {
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, 
V] {
       override def get(): Processor[K, V] = processorSupplier()
     }
@@ -425,11 +413,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values 
computed by the given `joiner`,
    * one for each matched record-pair with the same key and within the joining 
window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VO, VR](otherStream: KStream[K, VO],
+   */
+  def join[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, 
joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, 
joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using inner 
equi join with 
@@ -444,10 +433,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values 
computed by the given `joiner`,
    * one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
+   */
+  def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit 
joined: Joined[K, V, VT]): KStream[K, VR] =
+    inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using 
non-windowed inner equi join.
@@ -460,14 +448,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#join`
    */ 
-  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] =
-      inner.join[GK, GV, RV](
-        globalKTable,
-        ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
-        ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
-      )
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
+    inner.join[GK, GV, RV](
+      globalKTable,
+      ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
+      ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
+    )
 
   /**
    * Join records of this stream with another [[KStream]]'s records using 
windowed left equi join with 
@@ -484,10 +473,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                    one for each matched record-pair with the same key and 
within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VO, VR](otherStream: KStream[K, VO],
+  def leftJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, 
joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, 
joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using left 
equi join with 
@@ -503,9 +493,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                 one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
+  def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit 
joined: Joined[K, V, VT]): KStream[K, VR] =
+    inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using 
non-windowed left equi join.
@@ -518,12 +507,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] = {
-
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
     inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, 
joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this stream with another [[KStream]]'s records using 
windowed outer equi join with 
@@ -540,10 +528,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * one for each matched record-pair with the same key and within the joining 
window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
    */ 
-  def outerJoin[VO, VR](otherStream: KStream[K, VO],
+  def outerJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, 
windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, 
joined)
 
   /**
    * Merge this stream and the given stream into one larger stream.
@@ -567,7 +556,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
-  def peek(action: (K, V) => Unit): KStream[K, V] = {
+  def peek(action: (K, V) => Unit): KStream[K, V] =
     inner.peek((k: K, v: V) => action(k, v))
-  }
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 65cf895..cff1844 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -225,7 +225,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param serialized    the `Serialized` instance used to specify `Serdes`
    * @return a [[KGroupedTable]] that contains the re-grouped records of the 
original [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
-   */ 
+   */
   def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: 
Serialized[KR, VR]): KGroupedTable[KR, VR] =
     inner.groupBy(selector.asKeyValueMapper, serialized)
 
@@ -237,9 +237,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] 
=
     inner.join[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -252,10 +251,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
+   */
+  def join[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -266,9 +266,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, 
VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -281,10 +280,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -295,9 +295,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def outerJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): 
KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -311,9 +310,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
+  def outerJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 1e25554..fd2a565 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -45,10 +45,9 @@ class SessionWindowedKStream[K, V](val inner: 
SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, 
merger.asMerger)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        merger: (K, VR, VR) => VR): 
KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, merger.asMerger)
 
   /**
    * Aggregate the values of records in this stream by the grouped key and 
defined `SessionWindows`.
@@ -61,11 +60,12 @@ class SessionWindowedKStream[K, V](val inner: 
SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
     merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, ByteArraySessionStore]): 
KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, 
merger.asMerger, materialized)
+    materialized: Materialized[K, VR, ByteArraySessionStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, merger.asMerger, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key into 
`SessionWindows`.
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index b00d025..a16c72b 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -44,9 +44,8 @@ class TimeWindowedKStream[K, V](val inner: 
TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): 
KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -58,10 +57,11 @@ class TimeWindowedKStream[K, V](val inner: 
TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayWindowStore]): 
KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, 
materialized)
+    materialized: Materialized[K, VR, ByteArrayWindowStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key and the 
defined windows.
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 113458e..7aa0648 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -88,7 +88,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends JUnitSuite
       userClicksStream
 
         // Join the stream against the table.
-        .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if 
(region == null) "UNKNOWN" else region, clicks))
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) 
"UNKNOWN" else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region> -> 
<clicks>
         .map((_, regionWithClicks) => regionWithClicks)
@@ -180,7 +180,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends JUnitSuite
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers())
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath())
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot.getPath)
 
     streamsConfiguration
   }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 9495ea7..e8b9f0f 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -142,7 +142,7 @@ class TopologyTest extends JUnitSuite {
   
       val clicksPerRegion: KTable[String, Long] =
         userClicksStream
-          .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if 
(region == null) "UNKNOWN" else region, clicks))
+          .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) 
"UNKNOWN" else region, clicks))
           .map((_, regionWithClicks) => regionWithClicks)
           .groupByKey
           .reduce(_ + _)

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to