This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 86b1150 MINOR: Update Streams Scala API for addition of Grouped (#5793) 86b1150 is described below commit 86b1150e18d3fe0a0e3019e034e2a6f0204f7a17 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Tue Oct 16 07:24:50 2018 -0700 MINOR: Update Streams Scala API for addition of Grouped (#5793) While working on the documentation updates I realized the Streams Scala API needs to get updated for the addition of Grouped Added a test for Grouped.scala ran all streams-scala tests and streams tests Reviewers: Matthias J. Sax <matth...@confluent.io>, John Roesler <j...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- docs/streams/upgrade-guide.html | 3 ++ .../streams/kstream/internals/GroupedInternal.java | 2 +- .../kafka/streams/scala/ImplicitConversions.scala | 12 +++--- .../kstream/{Serialized.scala => Grouped.scala} | 32 +++++++++++---- .../kafka/streams/scala/kstream/Joined.scala | 22 ++++++++++ .../kafka/streams/scala/kstream/KStream.scala | 28 ++++++------- .../kafka/streams/scala/kstream/KTable.scala | 11 +++-- .../kafka/streams/scala/kstream/package.scala | 2 +- ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 48 +++++++++++++++++++--- .../apache/kafka/streams/scala/TopologyTest.scala | 2 +- .../{SerializedTest.scala => GroupedTest.scala} | 30 +++++++++----- .../kafka/streams/scala/kstream/JoinedTest.scala | 10 +++++ 12 files changed, 150 insertions(+), 52 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 660b817..e79b106 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -88,6 +88,9 @@ Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code> enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition topic naming, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>. + + As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>. + If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes. </p> <p> diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java index 2360fc6..3569caa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Grouped; public class GroupedInternal<K, V> extends Grouped<K, V> { - GroupedInternal(final Grouped<K, V> grouped) { + public GroupedInternal(final Grouped<K, V> grouped) { super(grouped); } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala index c2ac1ff..f62da2e 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala @@ -19,6 +19,8 @@ */ package org.apache.kafka.streams.scala +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{ KGroupedStream => KGroupedStreamJ, KGroupedTable => KGroupedTableJ, @@ -27,12 +29,10 @@ import org.apache.kafka.streams.kstream.{ SessionWindowedKStream => SessionWindowedKStreamJ, TimeWindowedKStream => TimeWindowedKStreamJ } +import org.apache.kafka.streams.processor.StateStore import org.apache.kafka.streams.scala.kstream._ -import org.apache.kafka.streams.KeyValue -import org.apache.kafka.common.serialization.Serde import scala.language.implicitConversions -import org.apache.kafka.streams.processor.StateStore /** * Implicit conversions between the Scala wrapper objects and the underlying Java @@ -61,10 +61,10 @@ object ImplicitConversions { implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2) // we would also like to allow users implicit serdes - // and these implicits will convert them to `Serialized`, `Produced` or `Consumed` + // and these implicits will convert them to `Grouped`, `Produced` or `Consumed` - implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] = - Serialized.`with`[K, V] + implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] = + Grouped.`with`[K, V] implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] = Consumed.`with`[K, V] diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala similarity index 53% rename from streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala rename to streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala index f48d9bf..355eb93 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala @@ -14,23 +14,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.streams.scala.kstream import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.kstream.{Serialized => SerializedJ} +import org.apache.kafka.streams.kstream.{Grouped => GroupedJ} -object Serialized { +object Grouped { /** - * Construct a `Serialized` instance with the provided key and value [[Serde]]s. + * Construct a `Grouped` instance with the provided key and value [[Serde]]s. * If the [[Serde]] params are `null` the default serdes defined in the configs will be used. * - * @tparam K the key type - * @tparam V the value type + * @tparam K the key type + * @tparam V the value type * @param keySerde keySerde that will be used to materialize a stream * @param valueSerde valueSerde that will be used to materialize a stream - * @return a new instance of [[Serialized]] configured with the provided serdes + * @return a new instance of [[Grouped]] configured with the provided serdes */ - def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): SerializedJ[K, V] = - SerializedJ.`with`(keySerde, valueSerde) + def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] = + GroupedJ.`with`(keySerde, valueSerde) + + /** + * Construct a `Grouped` instance with the provided key and value [[Serde]]s. + * If the [[Serde]] params are `null` the default serdes defined in the configs will be used. + * + * @tparam K the key type + * @tparam V the value type + * @param name the name used as part of a potential repartition topic + * @param keySerde keySerde that will be used to materialize a stream + * @param valueSerde valueSerde that will be used to materialize a stream + * @return a new instance of [[Grouped]] configured with the provided serdes + */ + def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] = + GroupedJ.`with`(name, keySerde, valueSerde) + } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala index ffd3e61..b6dbb05 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala @@ -39,4 +39,26 @@ object Joined { otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] = JoinedJ.`with`(keySerde, valueSerde, otherValueSerde) + /** + * Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]] + * instances. + * `null` values are accepted and will be replaced by the default serdes as defined in config. + * + * @tparam K key type + * @tparam V value type + * @tparam VO other value type + * @param name name of possible repartition topic + * @param keySerde the key serde to use. + * @param valueSerde the value serde to use. + * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used + * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes + */ + // disable spotless scala, which wants to make a mess of the argument lists + // format: off + def `with`[K, V, VO](name: String) + (implicit keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] = + JoinedJ.`with`(keySerde, valueSerde, otherValueSerde, name) + // format:on } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index d54ac5a..635975b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -23,8 +23,8 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} -import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionsCompatConversions._ +import org.apache.kafka.streams.scala.ImplicitConversions._ import scala.collection.JavaConverters._ @@ -351,8 +351,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Group the records by their current key into a [[KGroupedStream]] * <p> - * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit - * serdes that will be converted to a `Serialized` instance implicitly. + * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit + * serdes that will be converted to a `Grouped` instance implicitly. * <p> * {{{ * Example: @@ -365,28 +365,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) * .map((_, regionWithClicks) => regionWithClicks) * - * // the groupByKey gets the Serialized instance through an implicit conversion of the + * // the groupByKey gets the Grouped instance through an implicit conversion of the * // serdes brought into scope through the import Serdes._ above * .groupByKey * .reduce(_ + _) * - * // Similarly you can create an implicit Serialized and it will be passed implicitly + * // Similarly you can create an implicit Grouped and it will be passed implicitly * // to the groupByKey call * }}} * - * @param serialized the instance of Serialized that gives the serdes + * @param grouped the instance of Grouped that gives the serdes * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupByKey` */ - def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] = - inner.groupByKey(serialized) + def groupByKey(implicit grouped: Grouped[K, V]): KGroupedStream[K, V] = + inner.groupByKey(grouped) /** * Group the records of this [[KStream]] on a new key that is selected using the provided key transformation function - * and the `Serialized` instance. + * and the `Grouped` instance. * <p> - * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit - * serdes that will be converted to a `Serialized` instance implicitly. + * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit + * serdes that will be converted to a `Grouped` instance implicitly. * <p> * {{{ * Example: @@ -401,7 +401,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * val wordCounts: KTable[String, Long] = * textLines.flatMapValues(v => pattern.split(v.toLowerCase)) * - * // the groupBy gets the Serialized instance through an implicit conversion of the + * // the groupBy gets the Grouped instance through an implicit conversion of the * // serdes brought into scope through the import Serdes._ above * .groupBy((k, v) => v) * @@ -412,8 +412,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupBy` */ - def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] = - inner.groupBy(selector.asKeyValueMapper, serialized) + def groupBy[KR](selector: (K, V) => KR)(implicit grouped: Grouped[KR, V]): KGroupedStream[KR, V] = + inner.groupBy(selector.asKeyValueMapper, grouped) /** * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index 881c8e0..9ac27ee 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -20,11 +20,10 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.utils.Bytes import org.apache.kafka.streams.kstream.{KTable => KTableJ, _} -import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionsCompatConversions._ +import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.state.KeyValueStore /** @@ -213,15 +212,15 @@ class KTable[K, V](val inner: KTableJ[K, V]) { /** * Re-groups the records of this [[KTable]] using the provided key/value mapper - * and `Serde`s as specified by `Serialized`. + * and `Serde`s as specified by `Grouped`. * * @param selector a function that computes a new grouping key and value to be aggregated - * @param serialized the `Serialized` instance used to specify `Serdes` + * @param grouped the `Grouped` instance used to specify `Serdes` * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]] * @see `org.apache.kafka.streams.kstream.KTable#groupBy` */ - def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] = - inner.groupBy(selector.asKeyValueMapper, serialized) + def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit grouped: Grouped[KR, VR]): KGroupedTable[KR, VR] = + inner.groupBy(selector.asKeyValueMapper, grouped) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala index 842dd79..db4463b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala @@ -20,7 +20,7 @@ import org.apache.kafka.streams.processor.StateStore package object kstream { type Materialized[K, V, S <: StateStore] = org.apache.kafka.streams.kstream.Materialized[K, V, S] - type Serialized[K, V] = org.apache.kafka.streams.kstream.Serialized[K, V] + type Grouped[K, V] = org.apache.kafka.streams.kstream.Grouped[K, V] type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V] type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V] type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO] diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 44c3605..523418d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -40,8 +40,48 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ @Test def testShouldCountClicksPerRegion(): Unit = { - // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, - // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will + // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced, + // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will + // get these instances automatically + import Serdes._ + + val streamsConfiguration: Properties = getStreamsConfiguration() + + val builder = new StreamsBuilder() + + val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) + + val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) + + // Compute the total per region by summing the individual click counts per region. + val clicksPerRegion: KTable[String, Long] = + userClicksStream + + // Join the stream against the table. + .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) + + // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> + .map((_, regionWithClicks) => regionWithClicks) + + // Compute the total per region by summing the individual click counts per region. + .groupByKey + .reduce(_ + _) + + // Write the (continuously updating) results to the output topic. + clicksPerRegion.toStream.to(outputTopic) + + val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) + streams.start() + + val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = + produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) + streams.close() + } + + @Test def testShouldCountClicksPerRegionWithNamedRepartitionTopic(): Unit = { + + // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced, + // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will // get these instances automatically import Serdes._ @@ -85,8 +125,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ import org.apache.kafka.streams.kstream.{KStream => KStreamJ, KTable => KTableJ, _} import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ} - import collection.JavaConverters._ - val streamsConfiguration: Properties = getStreamsConfiguration() streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) @@ -122,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion - .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong)) + .groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong)) .reduce { new Reducer[JLong] { def apply(v1: JLong, v2: JLong) = v1 + v2 diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 889e67c..a826401 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -191,7 +191,7 @@ class TopologyTest extends JUnitSuite { // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion - .groupByKey(Serialized.`with`[String, JLong]) + .groupByKey(Grouped.`with`[String, JLong]) .reduce { new Reducer[JLong] { def apply(v1: JLong, v2: JLong) = v1 + v2 diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala similarity index 52% rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala index 4264fa5..728562a 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala @@ -1,6 +1,4 @@ /* - * Copyright (C) 2018 Joan Goyeau. - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,23 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.streams.scala.kstream -import org.apache.kafka.streams.kstream.internals.SerializedInternal -import org.apache.kafka.streams.scala.Serdes._ +import org.apache.kafka.streams.kstream.internals.GroupedInternal import org.apache.kafka.streams.scala.Serdes +import org.apache.kafka.streams.scala.Serdes._ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{FlatSpec, Matchers} @RunWith(classOf[JUnitRunner]) -class SerializedTest extends FlatSpec with Matchers { +class GroupedTest extends FlatSpec with Matchers { - "Create a Serialized" should "create a Serialized with Serdes" in { - val serialized: Serialized[String, Long] = Serialized.`with`[String, Long] + "Create a Grouped" should "create a Grouped with Serdes" in { + val grouped: Grouped[String, Long] = Grouped.`with`[String, Long] - val internalSerialized = new SerializedInternal(serialized) - internalSerialized.keySerde.getClass shouldBe Serdes.String.getClass - internalSerialized.valueSerde.getClass shouldBe Serdes.Long.getClass + val internalGrouped = new GroupedInternal[String, Long](grouped) + internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass + internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass } + + "Create a Grouped with repartition topic name" should "create a Grouped with Serdes, and repartition topic name" in { + val repartitionTopicName = "repartition-topic" + val grouped: Grouped[String, Long] = Grouped.`with`(repartitionTopicName) + + val internalGrouped = new GroupedInternal[String, Long](grouped) + internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass + internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass + internalGrouped.name() shouldBe repartitionTopicName + } + } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala index 288b790..9a96a81 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala @@ -34,4 +34,14 @@ class JoinedTest extends FlatSpec with Matchers { joined.valueSerde.getClass shouldBe Serdes.Long.getClass joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass } + + "Create a Joined" should "create a Joined with Serdes and repartition topic name" in { + val repartitionTopicName = "repartition-topic" + val joined: Joined[String, Long, Int] = Joined.`with`(repartitionTopicName) + + joined.keySerde.getClass shouldBe Serdes.String.getClass + joined.valueSerde.getClass shouldBe Serdes.Long.getClass + joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass + joined.name() shouldBe repartitionTopicName + } }