[ 
https://issues.apache.org/jira/browse/KAFKA-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497374#comment-16497374
 ] 

ASF GitHub Bot commented on KAFKA-6936:
---------------------------------------

guozhangwang closed pull request #5066: KAFKA-6936: Implicit Materialized for 
aggregates
URL: https://github.com/apache/kafka/pull/5066
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index c32563f563a..0c384a1bad0 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -19,19 +19,22 @@
  */
 package org.apache.kafka.streams.scala
 
-import org.apache.kafka.streams.kstream.{KStream => KStreamJ,
-  KTable => KTableJ,
+import org.apache.kafka.streams.kstream.{
   KGroupedStream => KGroupedStreamJ,
+  KGroupedTable => KGroupedTableJ,
+  KStream => KStreamJ,
+  KTable => KTableJ,
   SessionWindowedKStream => SessionWindowedKStreamJ,
   TimeWindowedKStream => TimeWindowedKStreamJ,
-  KGroupedTable => KGroupedTableJ, _}
-
+  _
+}
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.common.serialization.Serde
-
 import scala.language.implicitConversions
 
+import org.apache.kafka.streams.processor.StateStore
+
 /**
  * Implicit conversions between the Scala wrapper objects and the underlying 
Java
  * objects.
@@ -70,6 +73,10 @@ object ImplicitConversions {
   implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], 
valueSerde: Serde[V]): Produced[K, V] =
     Produced.`with`(keySerde, valueSerde)
 
+  implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: 
Serde[K],
+                                                            valueSerde: 
Serde[V]): Materialized[K, V, S] =
+    Materialized.`with`[K, V, S](keySerde, valueSerde)
+
   implicit def joinedFromKeyValueOtherSerde[K, V, VO]
     (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: 
Serde[VO]): Joined[K, V, VO] =
     Joined.`with`(keySerde, valueSerde, otherValueSerde)
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 2e85bce91d1..0e5abfdd11b 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -36,18 +36,6 @@ import org.apache.kafka.streams.scala.FunctionConversions._
  */
 class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
 
-  /**
-   * Count the number of records in this stream by the grouped key.
-   *
-   * @return a [[KTable]] that contains "update" records with unmodified keys 
and `Long` values that
-   * represent the latest (rolling) count (i.e., number of records) for each 
key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
-   */ 
-  def count(): KTable[K, Long] = {
-    val c: KTable[K, java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
   /**
    * Count the number of records in this stream by the grouped key.
    * The result is written into a local `KeyValueStore` (which is basically an 
ever-updating materialized view)
@@ -57,24 +45,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, 
V]) {
    * @return a [[KTable]] that contains "update" records with unmodified keys 
and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each 
key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
-   */ 
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): 
KTable[K, Long] = {
+   */
+  def count()(implicit materialized: Materialized[K, Long, 
ByteArrayKeyValueStore]): KTable[K, Long] = {
     val c: KTable[K, java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArrayKeyValueStore]])
     c.mapValues[Long](Long2long _)
   }
 
-  /**
-   * Combine the values of records in this stream by the grouped key.
-   *
-   * @param reducer   a function `(V, V) => V` that computes a new aggregate 
result. 
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
-   */ 
-  def reduce(reducer: (V, V) => V): KTable[K, V] =
-    inner.reduce(reducer.asReducer)
-
   /**
    * Combine the values of records in this stream by the grouped key.
    *
@@ -83,38 +60,25 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, 
V]) {
    * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
-   */ 
-  def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, 
ByteArrayKeyValueStore]): KTable[K, V] = {
-
+   */
+  def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, 
ByteArrayKeyValueStore]): KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion 
doesn't take place
     // works perfectly with Scala 2.12 though
-    inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized)
-  }
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key.
-   *
-   * @param initializer   an `Initializer` that computes an initial 
intermediate aggregation result
-   * @param aggregator    an `Aggregator` that computes a new aggregate result
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
-   */ 
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): 
KTable[K, VR] =
-    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
+    inner.reduce(reducer.asReducer, materialized)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
    *
    * @param initializer   an `Initializer` that computes an initial 
intermediate aggregation result
    * @param aggregator    an `Aggregator` that computes a new aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a 
state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a 
state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
-   */ 
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
-                                        materialized: Materialized[K, VR, 
ByteArrayKeyValueStore]): KTable[K, VR] =
+   */
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, materialized)
 
   /**
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 87a11c5b835..99bc83e1ba0 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -35,19 +35,6 @@ import org.apache.kafka.streams.scala.FunctionConversions._
  */
 class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
 
-  /**
-   * Count number of records of the original [[KTable]] that got 
[[KTable#groupBy]] to
-   * the same key into a new instance of [[KTable]].
-   *
-   * @return a [[KTable]] that contains "update" records with unmodified keys 
and `Long` values that
-   * represent the latest (rolling) count (i.e., number of records) for each 
key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
-   */
-  def count(): KTable[K, Long] = {
-    val c: KTable[K, java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
   /**
    * Count number of records of the original [[KTable]] that got 
[[KTable#groupBy]] to
    * the same key into a new instance of [[KTable]].
@@ -57,7 +44,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * represent the latest (rolling) count (i.e., number of records) for each 
key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
    */
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): 
KTable[K, Long] = {
+  def count()(implicit materialized: Materialized[K, Long, 
ByteArrayKeyValueStore]): KTable[K, Long] = {
     val c: KTable[K, java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArrayKeyValueStore]])
     c.mapValues[Long](Long2long _)
@@ -69,30 +56,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *
    * @param adder      a function that adds a new value to the aggregate result
    * @param subtractor a function that removed an old value from the aggregate 
result
+   * @param materialized  an instance of `Materialized` used to materialize a 
state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
    */
   def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V): KTable[K, V] =
-    // need this explicit asReducer for Scala 2.11 or else the SAM conversion 
doesn't take place
-    // works perfectly with Scala 2.12 though
-    inner.reduce(adder.asReducer, subtractor.asReducer)
-
-  /**
-   * Combine the value of records of the original [[KTable]] that got 
[[KTable#groupBy]]
-   * to the same key into a new instance of [[KTable]].
-   *
-   * @param adder      a function that adds a new value to the aggregate result
-   * @param subtractor a function that removed an old value from the aggregate 
result
-   * @param materialized  an instance of `Materialized` used to materialize a 
state store. 
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
-   */
-  def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V,
-             materialized: Materialized[K, V, ByteArrayKeyValueStore]): 
KTable[K, V] =
+             subtractor: (V, V) => V)(implicit materialized: Materialized[K, 
V, ByteArrayKeyValueStore]): KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion 
doesn't take place
     // works perfectly with Scala 2.12 though
     inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
@@ -104,27 +74,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * @param initializer a function that provides an initial aggregate result 
value
    * @param adder       a function that adds a new record to the aggregate 
result
    * @param subtractor  an aggregator function that removed an old record from 
the aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a 
state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: 
(K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, 
subtractor.asAggregator)
-
-  /**
-   * Aggregate the value of records of the original [[KTable]] that got 
[[KTable#groupBy]]
-   * to the same key into a new instance of [[KTable]] using default 
serializers and deserializers.
-   *
-   * @param initializer a function that provides an initial aggregate result 
value
-   * @param adder       a function that adds a new record to the aggregate 
result
-   * @param subtractor  an aggregator function that removed an old record from 
the aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a 
state store. 
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR,
-                                        subtractor: (K, V, VR) => VR,
-                                        materialized: Materialized[K, VR, 
ByteArrayKeyValueStore]): KTable[K, VR] =
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: 
(K, V, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.aggregate((() => initializer).asInitializer, adder.asAggregator, 
subtractor.asAggregator, materialized)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index fd2a56564b8..ed41973c090 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -40,45 +40,18 @@ class SessionWindowedKStream[K, V](val inner: 
SessionWindowedKStreamJ[K, V]) {
    *
    * @param initializer    the initializer function
    * @param aggregator     the aggregator function
-   * @param sessionMerger  the merger function
+   * @param merger         the merger function
+   * @param materialized  an instance of `Materialized` used to materialize a 
state store.
    * @return a windowed [[KTable]] that contains "update" records with 
unmodified keys, and values that represent
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
   def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
-                                        merger: (K, VR, VR) => VR): 
KTable[Windowed[K], VR] =
-    inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, merger.asMerger)
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key and 
defined `SessionWindows`.
-   *
-   * @param initializer    the initializer function
-   * @param aggregator     the aggregator function
-   * @param sessionMerger  the merger function
-   * @param materialized  an instance of `Materialized` used to materialize a 
state store. 
-   * @return a windowed [[KTable]] that contains "update" records with 
unmodified keys, and values that represent
-   * the latest (rolling) aggregate for each key within a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(
-    aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, ByteArraySessionStore]
+                                        merger: (K, VR, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArraySessionStore]
   ): KTable[Windowed[K], VR] =
     inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, merger.asMerger, materialized)
 
-  /**
-   * Count the number of records in this stream by the grouped key into 
`SessionWindows`.
-   *
-   * @return a windowed [[KTable]] that contains "update" records with 
unmodified keys and `Long` values
-   * that represent the latest (rolling) count (i.e., number of records) for 
each key within a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
-   */
-  def count(): KTable[Windowed[K], Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
   /**
    * Count the number of records in this stream by the grouped key into 
`SessionWindows`.
    *
@@ -87,23 +60,12 @@ class SessionWindowedKStream[K, V](val inner: 
SessionWindowedKStreamJ[K, V]) {
    * that represent the latest (rolling) count (i.e., number of records) for 
each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
    */
-  def count(materialized: Materialized[K, Long, ByteArraySessionStore]): 
KTable[Windowed[K], Long] = {
+  def count()(implicit materialized: Materialized[K, Long, 
ByteArraySessionStore]): KTable[Windowed[K], Long] = {
     val c: KTable[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArraySessionStore]])
     c.mapValues[Long](Long2long _)
   }
 
-  /**
-   * Combine values of this stream by the grouped key into {@link 
SessionWindows}.
-   *
-   * @param reducer           a reducer function that computes a new aggregate 
result. 
-   * @return a windowed [[KTable]] that contains "update" records with 
unmodified keys, and values that represent
-   * the latest (rolling) aggregate for each key within a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
-   */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
-    inner.reduce((v1, v2) => reducer(v1, v2))
-
   /**
    * Combine values of this stream by the grouped key into {@link 
SessionWindows}.
    *
@@ -113,7 +75,8 @@ class SessionWindowedKStream[K, V](val inner: 
SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArraySessionStore]): 
KTable[Windowed[K], V] =
+  def reduce(reducer: (V, V) => V)(
+    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  ): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index a16c72b948e..9e31ab9a80a 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -40,41 +40,16 @@ class TimeWindowedKStream[K, V](val inner: 
TimeWindowedKStreamJ[K, V]) {
    *
    * @param initializer   an initializer function that computes an initial 
intermediate aggregation result
    * @param aggregator    an aggregator function that computes a new aggregate 
result
+   * @param materialized  an instance of `Materialized` used to materialize a 
state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): 
KTable[Windowed[K], VR] =
-    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key.
-   *
-   * @param initializer   an initializer function that computes an initial 
intermediate aggregation result
-   * @param aggregator    an aggregator function that computes a new aggregate 
result
-   * @param materialized  an instance of `Materialized` used to materialize a 
state store. 
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(
-    aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayWindowStore]
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
   ): KTable[Windowed[K], VR] =
     inner.aggregate((() => initializer).asInitializer, 
aggregator.asAggregator, materialized)
 
-  /**
-   * Count the number of records in this stream by the grouped key and the 
defined windows.
-   *
-   * @return a [[KTable]] that contains "update" records with unmodified keys 
and `Long` values that
-   * represent the latest (rolling) count (i.e., number of records) for each 
key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
-   */ 
-  def count(): KTable[Windowed[K], Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
   /**
    * Count the number of records in this stream by the grouped key and the 
defined windows.
    *
@@ -83,23 +58,12 @@ class TimeWindowedKStream[K, V](val inner: 
TimeWindowedKStreamJ[K, V]) {
    * represent the latest (rolling) count (i.e., number of records) for each 
key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
    */ 
-  def count(materialized: Materialized[K, Long, ByteArrayWindowStore]): 
KTable[Windowed[K], Long] = {
+  def count()(implicit materialized: Materialized[K, Long, 
ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
     val c: KTable[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArrayWindowStore]])
     c.mapValues[Long](Long2long _)
   }
 
-  /**
-   * Combine the values of records in this stream by the grouped key.
-   *
-   * @param reducer   a function that computes a new aggregate result
-   * @return a [[KTable]] that contains "update" records with unmodified keys, 
and values that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
-   */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
-    inner.reduce(reducer.asReducer)
-
   /**
    * Combine the values of records in this stream by the grouped key.
    *
@@ -109,7 +73,8 @@ class TimeWindowedKStream[K, V](val inner: 
TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArrayWindowStore]): 
KTable[Windowed[K], V] =
+  def reduce(reducer: (V, V) => V)(
+    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  ): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
 }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 12b8c8cb7b3..5abc1bcf6ff 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -87,7 +87,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData 
{
     // generate word counts
     val wordCounts: KTable[String, Long] =
       textLines.flatMapValues(v => pattern.split(v.toLowerCase))
-        .groupBy((k, v) => v)
+        .groupBy((_, v) => v)
         .count()
 
     // write to output topic
@@ -119,7 +119,7 @@ class WordCountTest extends JUnitSuite with 
WordCountTestData {
     val wordCounts: KTable[String, Long] =
       textLines.flatMapValues(v => pattern.split(v.toLowerCase))
         .groupBy((k, v) => v)
-        .count(Materialized.as("word-count"))
+        .count()(Materialized.as("word-count"))
 
     // write to output topic
     wordCounts.toStream.to(outputTopic)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Scala API Wrapper for Streams uses default serializer for table aggregate
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-6936
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6936
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Daniel Heinrich
>            Priority: Major
>
> On of the goals of the Scala API is to not fall back on the configured 
> default serializer, but let the compiler provide them through implicits.
> The aggregate method on KGroupedStream misses to achieve this goal.
> Compared to the Java API is this behavior very supprising, because no other 
> stream operation falls back to the default serializer and a developer assums, 
> that the compiler checks for the correct serializer type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to