[ https://issues.apache.org/jira/browse/KAFKA-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497374#comment-16497374 ]
ASF GitHub Bot commented on KAFKA-6936: --------------------------------------- guozhangwang closed pull request #5066: KAFKA-6936: Implicit Materialized for aggregates URL: https://github.com/apache/kafka/pull/5066 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala index c32563f563a..0c384a1bad0 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala @@ -19,19 +19,22 @@ */ package org.apache.kafka.streams.scala -import org.apache.kafka.streams.kstream.{KStream => KStreamJ, - KTable => KTableJ, +import org.apache.kafka.streams.kstream.{ KGroupedStream => KGroupedStreamJ, + KGroupedTable => KGroupedTableJ, + KStream => KStreamJ, + KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, TimeWindowedKStream => TimeWindowedKStreamJ, - KGroupedTable => KGroupedTableJ, _} - + _ +} import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.KeyValue import org.apache.kafka.common.serialization.Serde - import scala.language.implicitConversions +import org.apache.kafka.streams.processor.StateStore + /** * Implicit conversions between the Scala wrapper objects and the underlying Java * objects. @@ -70,6 +73,10 @@ object ImplicitConversions { implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] = Produced.`with`(keySerde, valueSerde) + implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K], + valueSerde: Serde[V]): Materialized[K, V, S] = + Materialized.`with`[K, V, S](keySerde, valueSerde) + implicit def joinedFromKeyValueOtherSerde[K, V, VO] (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] = Joined.`with`(keySerde, valueSerde, otherValueSerde) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 2e85bce91d1..0e5abfdd11b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -36,18 +36,6 @@ import org.apache.kafka.streams.scala.FunctionConversions._ */ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { - /** - * Count the number of records in this stream by the grouped key. - * - * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that - * represent the latest (rolling) count (i.e., number of records) for each key - * @see `org.apache.kafka.streams.kstream.KGroupedStream#count` - */ - def count(): KTable[K, Long] = { - val c: KTable[K, java.lang.Long] = inner.count() - c.mapValues[Long](Long2long _) - } - /** * Count the number of records in this stream by the grouped key. * The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view) @@ -57,24 +45,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#count` - */ - def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { + */ + def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { val c: KTable[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) c.mapValues[Long](Long2long _) } - /** - * Combine the values of records in this stream by the grouped key. - * - * @param reducer a function `(V, V) => V` that computes a new aggregate result. - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce` - */ - def reduce(reducer: (V, V) => V): KTable[K, V] = - inner.reduce(reducer.asReducer) - /** * Combine the values of records in this stream by the grouped key. * @@ -83,38 +60,25 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce` - */ - def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = { - + */ + def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place // works perfectly with Scala 2.12 though - inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized) - } - - /** - * Aggregate the values of records in this stream by the grouped key. - * - * @param initializer an `Initializer` that computes an initial intermediate aggregation result - * @param aggregator an `Aggregator` that computes a new aggregate result - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate` - */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[K, VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator) + inner.reduce(reducer.asReducer, materialized) /** * Aggregate the values of records in this stream by the grouped key. * * @param initializer an `Initializer` that computes an initial intermediate aggregation result * @param aggregator an `Aggregator` that computes a new aggregate result - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate` - */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + */ + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( + implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + ): KTable[K, VR] = inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala index 87a11c5b835..99bc83e1ba0 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala @@ -35,19 +35,6 @@ import org.apache.kafka.streams.scala.FunctionConversions._ */ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { - /** - * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to - * the same key into a new instance of [[KTable]]. - * - * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that - * represent the latest (rolling) count (i.e., number of records) for each key - * @see `org.apache.kafka.streams.kstream.KGroupedTable#count` - */ - def count(): KTable[K, Long] = { - val c: KTable[K, java.lang.Long] = inner.count() - c.mapValues[Long](Long2long _) - } - /** * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to * the same key into a new instance of [[KTable]]. @@ -57,7 +44,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#count` */ - def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { + def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { val c: KTable[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) c.mapValues[Long](Long2long _) @@ -69,30 +56,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * * @param adder a function that adds a new value to the aggregate result * @param subtractor a function that removed an old value from the aggregate result + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce` */ def reduce(adder: (V, V) => V, - subtractor: (V, V) => V): KTable[K, V] = - // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place - // works perfectly with Scala 2.12 though - inner.reduce(adder.asReducer, subtractor.asReducer) - - /** - * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]] - * to the same key into a new instance of [[KTable]]. - * - * @param adder a function that adds a new value to the aggregate result - * @param subtractor a function that removed an old value from the aggregate result - * @param materialized an instance of `Materialized` used to materialize a state store. - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce` - */ - def reduce(adder: (V, V) => V, - subtractor: (V, V) => V, - materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place // works perfectly with Scala 2.12 though inner.reduce(adder.asReducer, subtractor.asReducer, materialized) @@ -104,27 +74,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * @param initializer a function that provides an initial aggregate result value * @param adder a function that adds a new record to the aggregate result * @param subtractor an aggregator function that removed an old record from the aggregate result + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate` */ - def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR): KTable[K, VR] = - inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator) - - /** - * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]] - * to the same key into a new instance of [[KTable]] using default serializers and deserializers. - * - * @param initializer a function that provides an initial aggregate result value - * @param adder a function that adds a new record to the aggregate result - * @param subtractor an aggregator function that removed an old record from the aggregate result - * @param materialized an instance of `Materialized` used to materialize a state store. - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate` - */ - def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, - subtractor: (K, V, VR) => VR, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)( + implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + ): KTable[K, VR] = inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index fd2a56564b8..ed41973c090 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -40,45 +40,18 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * * @param initializer the initializer function * @param aggregator the aggregator function - * @param sessionMerger the merger function + * @param merger the merger function + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate` */ def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, - merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger) - - /** - * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`. - * - * @param initializer the initializer function - * @param aggregator the aggregator function - * @param sessionMerger the merger function - * @param materialized an instance of `Materialized` used to materialize a state store. - * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key within a window - * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate` - */ - def aggregate[VR](initializer: => VR)( - aggregator: (K, V, VR) => VR, - merger: (K, VR, VR) => VR, - materialized: Materialized[K, VR, ByteArraySessionStore] + merger: (K, VR, VR) => VR)( + implicit materialized: Materialized[K, VR, ByteArraySessionStore] ): KTable[Windowed[K], VR] = inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) - /** - * Count the number of records in this stream by the grouped key into `SessionWindows`. - * - * @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values - * that represent the latest (rolling) count (i.e., number of records) for each key within a window - * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count` - */ - def count(): KTable[Windowed[K], Long] = { - val c: KTable[Windowed[K], java.lang.Long] = inner.count() - c.mapValues[Long](Long2long _) - } - /** * Count the number of records in this stream by the grouped key into `SessionWindows`. * @@ -87,23 +60,12 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * that represent the latest (rolling) count (i.e., number of records) for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count` */ - def count(materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = { + def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = { val c: KTable[Windowed[K], java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]]) c.mapValues[Long](Long2long _) } - /** - * Combine values of this stream by the grouped key into {@link SessionWindows}. - * - * @param reducer a reducer function that computes a new aggregate result. - * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent - * the latest (rolling) aggregate for each key within a window - * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce` - */ - def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = - inner.reduce((v1, v2) => reducer(v1, v2)) - /** * Combine values of this stream by the grouped key into {@link SessionWindows}. * @@ -113,7 +75,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V, - materialized: Materialized[K, V, ByteArraySessionStore]): KTable[Windowed[K], V] = + def reduce(reducer: (V, V) => V)( + implicit materialized: Materialized[K, V, ByteArraySessionStore] + ): KTable[Windowed[K], V] = inner.reduce(reducer.asReducer, materialized) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index a16c72b948e..9e31ab9a80a 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -40,41 +40,16 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * * @param initializer an initializer function that computes an initial intermediate aggregation result * @param aggregator an aggregator function that computes a new aggregate result + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator) - - /** - * Aggregate the values of records in this stream by the grouped key. - * - * @param initializer an initializer function that computes an initial intermediate aggregation result - * @param aggregator an aggregator function that computes a new aggregate result - * @param materialized an instance of `Materialized` used to materialize a state store. - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate` - */ - def aggregate[VR](initializer: => VR)( - aggregator: (K, V, VR) => VR, - materialized: Materialized[K, VR, ByteArrayWindowStore] + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( + implicit materialized: Materialized[K, VR, ByteArrayWindowStore] ): KTable[Windowed[K], VR] = inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) - /** - * Count the number of records in this stream by the grouped key and the defined windows. - * - * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that - * represent the latest (rolling) count (i.e., number of records) for each key - * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count` - */ - def count(): KTable[Windowed[K], Long] = { - val c: KTable[Windowed[K], java.lang.Long] = inner.count() - c.mapValues[Long](Long2long _) - } - /** * Count the number of records in this stream by the grouped key and the defined windows. * @@ -83,23 +58,12 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count` */ - def count(materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = { + def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = { val c: KTable[Windowed[K], java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]]) c.mapValues[Long](Long2long _) } - /** - * Combine the values of records in this stream by the grouped key. - * - * @param reducer a function that computes a new aggregate result - * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key - * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce` - */ - def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = - inner.reduce(reducer.asReducer) - /** * Combine the values of records in this stream by the grouped key. * @@ -109,7 +73,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V, - materialized: Materialized[K, V, ByteArrayWindowStore]): KTable[Windowed[K], V] = + def reduce(reducer: (V, V) => V)( + implicit materialized: Materialized[K, V, ByteArrayWindowStore] + ): KTable[Windowed[K], V] = inner.reduce(reducer.asReducer, materialized) } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala index 12b8c8cb7b3..5abc1bcf6ff 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala @@ -87,7 +87,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData { // generate word counts val wordCounts: KTable[String, Long] = textLines.flatMapValues(v => pattern.split(v.toLowerCase)) - .groupBy((k, v) => v) + .groupBy((_, v) => v) .count() // write to output topic @@ -119,7 +119,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData { val wordCounts: KTable[String, Long] = textLines.flatMapValues(v => pattern.split(v.toLowerCase)) .groupBy((k, v) => v) - .count(Materialized.as("word-count")) + .count()(Materialized.as("word-count")) // write to output topic wordCounts.toStream.to(outputTopic) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scala API Wrapper for Streams uses default serializer for table aggregate > ------------------------------------------------------------------------- > > Key: KAFKA-6936 > URL: https://issues.apache.org/jira/browse/KAFKA-6936 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Daniel Heinrich > Priority: Major > > On of the goals of the Scala API is to not fall back on the configured > default serializer, but let the compiler provide them through implicits. > The aggregate method on KGroupedStream misses to achieve this goal. > Compared to the Java API is this behavior very supprising, because no other > stream operation falls back to the default serializer and a developer assums, > that the compiler checks for the correct serializer type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)