This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new ecb7943 MINOR: Add Scalafmt to Streams Scala API (#4965)
ecb7943 is described below
commit ecb7943d852f19a6bf5b6cf7822d99a2217b5040
Author: Joan Goyeau <[email protected]>
AuthorDate: Tue Jul 10 01:48:34 2018 +0200
MINOR: Add Scalafmt to Streams Scala API (#4965)
Reviewers: Guozhang Wang <[email protected]>
---
build.gradle | 9 ++
checkstyle/.scalafmt.conf | 20 +++
.../kafka/streams/scala/FunctionConversions.scala | 8 +-
.../kafka/streams/scala/ImplicitConversions.scala | 5 +-
.../org/apache/kafka/streams/scala/Serdes.scala | 41 +++---
.../kafka/streams/scala/StreamsBuilder.scala | 32 ++---
.../streams/scala/kstream/KGroupedStream.scala | 7 +-
.../streams/scala/kstream/KGroupedTable.scala | 2 +-
.../kafka/streams/scala/kstream/KStream.scala | 147 ++++++++++-----------
.../kafka/streams/scala/kstream/KTable.scala | 134 +++++++++----------
.../scala/kstream/SessionWindowedKStream.scala | 9 +-
.../scala/kstream/TimeWindowedKStream.scala | 6 +-
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 58 ++++----
.../streams/scala/StreamToTableJoinTestData.scala | 1 -
.../apache/kafka/streams/scala/TopologyTest.scala | 70 +++++-----
.../apache/kafka/streams/scala/WordCountTest.scala | 19 +--
16 files changed, 303 insertions(+), 265 deletions(-)
diff --git a/build.gradle b/build.gradle
index d4a3e8b..6d7c325 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,6 +29,15 @@ buildscript {
classpath 'org.scoverage:gradle-scoverage:2.3.0'
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
classpath 'org.owasp:dependency-check-gradle:3.1.2'
+ classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
+ }
+}
+
+apply plugin: "com.diffplug.gradle.spotless"
+spotless {
+ scala {
+ target 'streams/**/*.scala'
+ scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
}
}
diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf
new file mode 100644
index 0000000..057e3b9
--- /dev/null
+++ b/checkstyle/.scalafmt.conf
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+docstrings = JavaDoc
+maxColumn = 120
+continuationIndent.defnSite = 2
+assumeStandardLibraryStripMargin = true
+danglingParentheses = true
+rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
\ No newline at end of file
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index abf1659..65ea490 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
* more expressive, with less boilerplate and more succinct.
* <p>
* For Scala 2.11, most of these conversions need to be invoked explicitly, as
Scala 2.11 does not
- * have full support for SAM types.
+ * have full support for SAM types.
*/
object FunctionConversions {
@@ -40,7 +40,7 @@ object FunctionConversions {
}
}
- implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends
AnyVal {
+ implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends
AnyVal {
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U,
VR] {
override def apply(key: T, value: U): VR = f(key, value)
}
@@ -49,7 +49,7 @@ object FunctionConversions {
}
}
- implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR,
VR)) extends AnyVal {
+ implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) =>
(KR, VR)) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new
KeyValueMapper[K, V, KeyValue[KR, VR]] {
override def apply(key: K, value: V): KeyValue[KR, VR] = {
val (kr, vr) = f(key, value)
@@ -88,7 +88,7 @@ object FunctionConversions {
}
}
- implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends
AnyVal {
+ implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends
AnyVal {
def asMerger: Merger[K, VR] = new Merger[K, VR] {
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey,
aggOne, aggTwo)
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index 0c384a1..d1ff674 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -77,7 +77,8 @@ object ImplicitConversions {
valueSerde:
Serde[V]): Materialized[K, V, S] =
Materialized.`with`[K, V, S](keySerde, valueSerde)
- implicit def joinedFromKeyValueOtherSerde[K, V, VO]
- (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde:
Serde[VO]): Joined[K, V, VO] =
+ implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde:
Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde:
Serde[VO]): Joined[K, V, VO] =
Joined.`with`(keySerde, valueSerde, otherValueSerde)
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index a0ffffa..8bfb083 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer,
Serde, Serializer, S
import org.apache.kafka.streams.kstream.WindowedSerdes
object Serdes {
- implicit val String: Serde[String] =
JSerdes.String()
- implicit val Long: Serde[Long] =
JSerdes.Long().asInstanceOf[Serde[Long]]
- implicit val JavaLong: Serde[java.lang.Long] =
JSerdes.Long()
- implicit val ByteArray: Serde[Array[Byte]] =
JSerdes.ByteArray()
+ implicit val String: Serde[String] = JSerdes.String()
+ implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+ implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+ implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] =
JSerdes.Bytes()
- implicit val Float: Serde[Float] =
JSerdes.Float().asInstanceOf[Serde[Float]]
- implicit val JavaFloat: Serde[java.lang.Float] =
JSerdes.Float()
- implicit val Double: Serde[Double] =
JSerdes.Double().asInstanceOf[Serde[Double]]
- implicit val JavaDouble: Serde[java.lang.Double] =
JSerdes.Double()
- implicit val Integer: Serde[Int] =
JSerdes.Integer().asInstanceOf[Serde[Int]]
- implicit val JavaInteger: Serde[java.lang.Integer] =
JSerdes.Integer()
+ implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+ implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+ implicit val Double: Serde[Double] =
JSerdes.Double().asInstanceOf[Serde[Double]]
+ implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+ implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+ implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new
WindowedSerdes.TimeWindowedSerde[T]()
- implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T]
= new WindowedSerdes.SessionWindowedSerde[T]()
+ implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T]
=
+ new WindowedSerdes.SessionWindowedSerde[T]()
def fromFn[T >: Null](serializer: T => Array[Byte], deserializer:
Array[Byte] => Option[T]): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
- override def serialize(topic: String, data: T): Array[Byte]
= serializer(data)
+ override def serialize(topic: String, data: T): Array[Byte] =
serializer(data)
override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
- override def close(): Unit
= ()
+ override def close(): Unit = ()
},
new Deserializer[T] {
- override def deserialize(topic: String, data: Array[Byte]): T
= deserializer(data).orNull
+ override def deserialize(topic: String, data: Array[Byte]): T =
deserializer(data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
- override def close(): Unit
= ()
+ override def close(): Unit = ()
}
)
def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
- deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+ deserializer: (String, Array[Byte]) => Option[T]):
Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
- override def serialize(topic: String, data: T): Array[Byte]
= serializer(topic, data)
+ override def serialize(topic: String, data: T): Array[Byte] =
serializer(topic, data)
override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
- override def close(): Unit
= ()
+ override def close(): Unit = ()
},
new Deserializer[T] {
- override def deserialize(topic: String, data: Array[Byte]): T
= deserializer(topic, data).orNull
+ override def deserialize(topic: String, data: Array[Byte]): T =
deserializer(topic, data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
- override def close(): Unit
= ()
+ override def close(): Unit = ()
}
)
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index af342ac..fcec778 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -31,18 +31,18 @@ import ImplicitConversions._
import scala.collection.JavaConverters._
/**
- * Wraps the Java class StreamsBuilder and delegates method calls to the
underlying Java object.
- */
+ * Wraps the Java class StreamsBuilder and delegates method calls to the
underlying Java object.
+ */
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
* Create a [[kstream.KStream]] from the specified topic.
* <p>
- * The `implicit Consumed` instance provides the values of
`auto.offset.reset` strategy, `TimestampExtractor`,
+ * The `implicit Consumed` instance provides the values of
`auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope,
compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in
scope, which will be implicitly
- * converted to generate an instance of `Consumed`. @see
[[ImplicitConversions]].
+ * converted to generate an instance of `Consumed`. @see
[[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
@@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
/**
* Create a [[kstream.KTable]] from the specified topic.
* <p>
- * The `implicit Consumed` instance provides the values of
`auto.offset.reset` strategy, `TimestampExtractor`,
+ * The `implicit Consumed` instance provides the values of
`auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope,
compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in
scope, which will be implicitly
- * converted to generate an instance of `Consumed`. @see
[[ImplicitConversions]].
+ * converted to generate an instance of `Consumed`. @see
[[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
@@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
- def table[K, V](topic: String, materialized: Materialized[K, V,
ByteArrayKeyValueStore])
- (implicit consumed: Consumed[K, V]): KTable[K, V] =
+ def table[K, V](topic: String, materialized: Materialized[K, V,
ByteArrayKeyValueStore])(
+ implicit consumed: Consumed[K, V]
+ ): KTable[K, V] =
inner.table[K, V](topic, consumed, materialized)
/**
@@ -139,8 +140,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
inner.globalTable(topic, consumed)
/**
- * Create a `GlobalKTable` from the specified topic. The resulting
`GlobalKTable` will be materialized
- * in a local `KeyValueStore` configured with the provided instance of
`Materialized`. The serializers
+ * Create a `GlobalKTable` from the specified topic. The resulting
`GlobalKTable` will be materialized
+ * in a local `KeyValueStore` configured with the provided instance of
`Materialized`. The serializers
* from the implicit `Consumed` instance will be used.
*
* @param topic the topic name
@@ -148,12 +149,13 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
- def globalTable[K, V](topic: String, materialized: Materialized[K, V,
ByteArrayKeyValueStore])
- (implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
+ def globalTable[K, V](topic: String, materialized: Materialized[K, V,
ByteArrayKeyValueStore])(
+ implicit consumed: Consumed[K, V]
+ ): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)
/**
- * Adds a state store to the underlying `Topology`. The store must still be
"connected" to a `Processor`,
+ * Adds a state store to the underlying `Topology`. The store must still be
"connected" to a `Processor`,
* `Transformer`, or `ValueTransformer` before it can be used.
*
* @param builder the builder used to obtain this state store `StateStore`
instance
@@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ =
inner.addStateStore(builder)
/**
- * Adds a global `StateStore` to the topology. Global stores should not be
added to `Processor, `Transformer`,
+ * Adds a global `StateStore` to the topology. Global stores should not be
added to `Processor, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
- */
+ */
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 0e5abfd..f6a22d9 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream =>
KGroupedStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
-
/**
* Wraps the Java class KGroupedStream and delegates method calls to the
underlying Java object.
*
@@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* The result is written into a local `KeyValueStore` (which is basically an
ever-updating materialized view)
* provided by the given `materialized`.
*
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a [[KTable]] that contains "update" records with unmodified keys
and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each
key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
@@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
/**
* Combine the values of records in this stream by the grouped key.
*
- * @param reducer a function `(V, V) => V` that computes a new aggregate
result.
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param reducer a function `(V, V) => V` that computes a new aggregate
result.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a [[KTable]] that contains "update" records with unmodified keys,
and values that represent the
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 99bc83e..76ea9ed 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* Count number of records of the original [[KTable]] that got
[[KTable#groupBy]] to
* the same key into a new instance of [[KTable]].
*
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a [[KTable]] that contains "update" records with unmodified keys
and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each
key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 8806f5c..a8766bd 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -46,7 +46,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param predicate a filter that is applied to each record
* @return a [[KStream]] that contains only those records that satisfy the
given predicate
* @see `org.apache.kafka.streams.kstream.KStream#filter`
- */
+ */
def filter(predicate: (K, V) => Boolean): KStream[K, V] =
inner.filter(predicate.asPredicate)
@@ -57,7 +57,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param predicate a filter that is applied to each record
* @return a [[KStream]] that contains only those records that do
<em>not</em> satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KStream#filterNot`
- */
+ */
def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
inner.filterNot(predicate.asPredicate)
@@ -70,7 +70,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper a function `(K, V) => KR` that computes a new key for each
record
* @return a [[KStream]] that contains records with new key (possibly of
different type) and unmodified value
* @see `org.apache.kafka.streams.kstream.KStream#selectKey`
- */
+ */
def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
inner.selectKey[KR](mapper.asKeyValueMapper)
@@ -83,7 +83,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper a function `(K, V) => (KR, VR)` that computes a new output
record
* @return a [[KStream]] that contains records with new key and value
(possibly both of different type)
* @see `org.apache.kafka.streams.kstream.KStream#map`
- */
+ */
def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
val kvMapper = mapper.tupled andThen tuple2ToKeyValue
inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
@@ -97,7 +97,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper, a function `V => VR` that computes a new output value
* @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#mapValues`
- */
+ */
def mapValues[VR](mapper: V => VR): KStream[K, VR] =
inner.mapValues[VR](mapper.asValueMapper)
@@ -109,7 +109,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper, a function `(K, V) => VR` that computes a new output value
* @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#mapValues`
- */
+ */
def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
inner.mapValues[VR](mapper.asValueMapperWithKey)
@@ -122,10 +122,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper function `(K, V) => Iterable[(KR, VR)]` that computes the
new output records
* @return a [[KStream]] that contains more or less records with new key and
value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#flatMap`
- */
+ */
def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] =
{
val kvMapper = mapper.tupled andThen (iter =>
iter.map(tuple2ToKeyValue).asJava)
- inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k , v)).asKeyValueMapper)
+ inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
}
/**
@@ -139,7 +139,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper a function `V => Iterable[VR]` that computes the new output
values
* @return a [[KStream]] that contains more or less records with unmodified
keys and new values of different type
* @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
- */
+ */
def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
inner.flatMapValues[VR](mapper.asValueMapper)
@@ -154,7 +154,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param mapper a function `(K, V) => Iterable[VR]` that computes the new
output values
* @return a [[KStream]] that contains more or less records with unmodified
keys and new values of different type
* @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
- */
+ */
def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
inner.flatMapValues[VR](mapper.asValueMapperWithKey)
@@ -187,7 +187,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.branch(predicates.map(_.asPredicate): _*).map(kstream =>
wrapKStream(kstream))
/**
- * Materialize this stream to a topic and creates a new [[KStream]] from the
topic using the `Produced` instance for
+ * Materialize this stream to a topic and creates a new [[KStream]] from the
topic using the `Produced` instance for
* configuration of the `Serde key serde`, `Serde value serde`, and
`StreamPartitioner`
* <p>
* The user can either supply the `Produced` instance as an implicit in
scope or she can also provide implicit
@@ -219,7 +219,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.through(topic, produced)
/**
- * Materialize this stream to a topic using the `Produced` instance for
+ * Materialize this stream to a topic using the `Produced` instance for
* configuration of the `Serde key serde`, `Serde value serde`, and
`StreamPartitioner`
* <p>
* The user can either supply the `Produced` instance as an implicit in
scope or she can also provide implicit
@@ -250,34 +250,34 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.to(topic, produced)
/**
- * Dynamically materialize this stream to topics using the `Produced`
instance for
- * configuration of the `Serde key serde`, `Serde value serde`, and
`StreamPartitioner`.
- * The topic names for each record to send to is dynamically determined
based on the given mapper.
- * <p>
- * The user can either supply the `Produced` instance as an implicit in
scope or she can also provide implicit
- * key and value serdes that will be converted to a `Produced` instance
implicitly.
- * <p>
- * {{{
- * Example:
- *
- * // brings implicit serdes in scope
- * import Serdes._
- *
- * //..
- * val clicksPerRegion: KTable[String, Long] = //..
- *
- * // Implicit serdes in scope will generate an implicit Produced instance,
which
- * // will be passed automatically to the call of through below
- * clicksPerRegion.to(topicChooser)
- *
- * // Similarly you can create an implicit Produced and it will be passed
implicitly
- * // to the through call
- * }}}
- *
- * @param extractor the extractor to determine the name of the Kafka topic
to write to for reach record
- * @param (implicit) produced the instance of Produced that gives the
serdes and `StreamPartitioner`
- * @see `org.apache.kafka.streams.kstream.KStream#to`
- */
+ * Dynamically materialize this stream to topics using the `Produced`
instance for
+ * configuration of the `Serde key serde`, `Serde value serde`, and
`StreamPartitioner`.
+ * The topic names for each record to send to is dynamically determined
based on the given mapper.
+ * <p>
+ * The user can either supply the `Produced` instance as an implicit in
scope or she can also provide implicit
+ * key and value serdes that will be converted to a `Produced` instance
implicitly.
+ * <p>
+ * {{{
+ * Example:
+ *
+ * // brings implicit serdes in scope
+ * import Serdes._
+ *
+ * //..
+ * val clicksPerRegion: KTable[String, Long] = //..
+ *
+ * // Implicit serdes in scope will generate an implicit Produced instance,
which
+ * // will be passed automatically to the call of through below
+ * clicksPerRegion.to(topicChooser)
+ *
+ * // Similarly you can create an implicit Produced and it will be passed
implicitly
+ * // to the through call
+ * }}}
+ *
+ * @param extractor the extractor to determine the name of the Kafka topic
to write to for reach record
+ * @param (implicit) produced the instance of Produced that gives the serdes
and `StreamPartitioner`
+ * @see `org.apache.kafka.streams.kstream.KStream#to`
+ */
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K,
V]): Unit =
inner.to(extractor, produced)
@@ -303,14 +303,14 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Transform the value of each input record into a new value (with possible
new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
* record value and computes a new value for it.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
*
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier`
that generates a `ValueTransformer`
* @param stateStoreNames the names of the state stores used by the
processor
* @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
+ */
def transformValues[VR](valueTransformerSupplier:
ValueTransformerSupplier[V, VR],
stateStoreNames: String*): KStream[K, VR] =
inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
@@ -319,29 +319,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Transform the value of each input record into a new value (with possible
new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
* record value and computes a new value for it.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
*
* @param valueTransformerSupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* @param stateStoreNames the names of the state stores used by the
processor
* @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
+ */
def transformValues[VR](valueTransformerSupplier:
ValueTransformerWithKeySupplier[K, V, VR],
- stateStoreNames: String*): KStream[K, VR] = {
+ stateStoreNames: String*): KStream[K, VR] =
inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
- }
/**
* Process all records in this stream, one record at a time, by applying a
`Processor` (provided by the given
* `processorSupplier`).
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
*
* @param processorSupplier a function that generates a
[[org.apache.kafka.stream.Processor]]
* @param stateStoreNames the names of the state store used by the
processor
* @see `org.apache.kafka.streams.kstream.KStream#process`
- */
+ */
def process(processorSupplier: () => Processor[K, V], stateStoreNames:
String*): Unit = {
val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K,
V] {
override def get(): Processor[K, V] = processorSupplier()
@@ -350,7 +349,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
}
/**
- * Group the records by their current key into a [[KGroupedStream]]
+ * Group the records by their current key into a [[KGroupedStream]]
* <p>
* The user can either supply the `Serialized` instance as an implicit in
scope or she can also provide an implicit
* serdes that will be converted to a `Serialized` instance implicitly.
@@ -375,10 +374,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* // to the groupByKey call
* }}}
*
- * @param (implicit) serialized the instance of Serialized that gives the
serdes
+ * @param (implicit) serialized the instance of Serialized that gives the
serdes
* @return a [[KGroupedStream]] that contains the grouped records of the
original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
- */
+ */
def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
inner.groupByKey(serialized)
@@ -412,18 +411,18 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param selector a function that computes a new key for grouping
* @return a [[KGroupedStream]] that contains the grouped records of the
original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupBy`
- */
+ */
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR,
V]): KGroupedStream[KR, V] =
inner.groupBy(selector.asKeyValueMapper, serialized)
/**
- * Join records of this stream with another [[KStream]]'s records using
windowed inner equi join with
+ * Join records of this stream with another [[KStream]]'s records using
windowed inner equi join with
* serializers and deserializers supplied by the implicit `Joined` instance.
*
* @param otherStream the [[KStream]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of
matching records
* @param windows the specification of the `JoinWindows`
- * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
+ * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
* inputs and outputs of the joined streams. Instead of
`Joined`, the user can also supply
* key serde, value serde and other value serde in
implicit scope and they will be
* converted to the instance of `Joined` through implicit
conversion
@@ -438,17 +437,17 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows,
joined)
/**
- * Join records of this stream with another [[KTable]]'s records using inner
equi join with
+ * Join records of this stream with another [[KTable]]'s records using inner
equi join with
* serializers and deserializers supplied by the implicit `Joined` instance.
*
* @param table the [[KTable]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of
matching records
- * @param joined an implicit `Joined` instance that defines the serdes to
be used to serialize/deserialize
+ * @param joined an implicit `Joined` instance that defines the serdes to
be used to serialize/deserialize
* inputs and outputs of the joined streams. Instead of
`Joined`, the user can also supply
* key serde, value serde and other value serde in implicit
scope and they will be
* converted to the instance of `Joined` through implicit
conversion
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
- * one for each matched record-pair with the same key
+ * one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KStream#join`
*/
def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit
joined: Joined[K, V, VT]): KStream[K, VR] =
@@ -464,7 +463,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
* one output for each input [[KStream]] record
* @see `org.apache.kafka.streams.kstream.KStream#join`
- */
+ */
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
keyValueMapper: (K, V) => GK,
joiner: (V, GV) => RV
@@ -476,20 +475,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
)
/**
- * Join records of this stream with another [[KStream]]'s records using
windowed left equi join with
+ * Join records of this stream with another [[KStream]]'s records using
windowed left equi join with
* serializers and deserializers supplied by the implicit `Joined` instance.
*
* @param otherStream the [[KStream]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of
matching records
* @param windows the specification of the `JoinWindows`
- * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
+ * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
* inputs and outputs of the joined streams. Instead of
`Joined`, the user can also supply
* key serde, value serde and other value serde in
implicit scope and they will be
* converted to the instance of `Joined` through implicit
conversion
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
* one for each matched record-pair with the same key and
within the joining window intervals
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
- */
+ */
def leftJoin[VO, VR](otherStream: KStream[K, VO])(
joiner: (V, VO) => VR,
windows: JoinWindows
@@ -497,19 +496,19 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows,
joined)
/**
- * Join records of this stream with another [[KTable]]'s records using left
equi join with
+ * Join records of this stream with another [[KTable]]'s records using left
equi join with
* serializers and deserializers supplied by the implicit `Joined` instance.
*
* @param table the [[KTable]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of
matching records
- * @param joined an implicit `Joined` instance that defines the serdes to
be used to serialize/deserialize
+ * @param joined an implicit `Joined` instance that defines the serdes to
be used to serialize/deserialize
* inputs and outputs of the joined streams. Instead of
`Joined`, the user can also supply
* key serde, value serde and other value serde in implicit
scope and they will be
* converted to the instance of `Joined` through implicit
conversion
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
- * one for each matched record-pair with the same key
+ * one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
- */
+ */
def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit
joined: Joined[K, V, VT]): KStream[K, VR] =
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
@@ -523,7 +522,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
* one output for each input [[KStream]] record
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
- */
+ */
def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
keyValueMapper: (K, V) => GK,
joiner: (V, GV) => RV
@@ -531,20 +530,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper,
joiner.asValueJoiner)
/**
- * Join records of this stream with another [[KStream]]'s records using
windowed outer equi join with
+ * Join records of this stream with another [[KStream]]'s records using
windowed outer equi join with
* serializers and deserializers supplied by the implicit `Joined` instance.
*
* @param otherStream the [[KStream]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of
matching records
* @param windows the specification of the `JoinWindows`
- * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
+ * @param joined an implicit `Joined` instance that defines the serdes
to be used to serialize/deserialize
* inputs and outputs of the joined streams. Instead of
`Joined`, the user can also supply
* key serde, value serde and other value serde in
implicit scope and they will be
* converted to the instance of `Joined` through implicit
conversion
* @return a [[KStream]] that contains join-records for each key and values
computed by the given `joiner`,
* one for each matched record-pair with the same key and within the joining
window intervals
* @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
- */
+ */
def outerJoin[VO, VR](otherStream: KStream[K, VO])(
joiner: (V, VO) => VR,
windows: JoinWindows
@@ -554,8 +553,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
* Merge this stream and the given stream into one larger stream.
* <p>
- * There is no ordering guarantee between records from this `KStream` and
records from the provided `KStream`
- * in the merged stream. Relative order is preserved within each input
stream though (ie, records within
+ * There is no ordering guarantee between records from this `KStream` and
records from the provided `KStream`
+ * in the merged stream. Relative order is preserved within each input
stream though (ie, records within
* one input stream are processed in order).
*
* @param stream a stream which is to be merged into this stream
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index cff1844..b669771 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -44,10 +44,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param predicate a filter that is applied to each record
* @return a [[KTable]] that contains only those records that satisfy the
given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filter`
- */
- def filter(predicate: (K, V) => Boolean): KTable[K, V] = {
+ */
+ def filter(predicate: (K, V) => Boolean): KTable[K, V] =
inner.filter(predicate(_, _))
- }
/**
* Create a new [[KTable]] that consists all records of this [[KTable]]
which satisfies the given
@@ -55,12 +54,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*
* @param predicate a filter that is applied to each record
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains only those records that satisfy the
given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filter`
- */
- def filter(predicate: (K, V) => Boolean,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ */
+ def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V,
ByteArrayKeyValueStore]): KTable[K, V] =
inner.filter(predicate.asPredicate, materialized)
/**
@@ -70,7 +68,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param predicate a filter that is applied to each record
* @return a [[KTable]] that contains only those records that do
<em>not</em> satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
- */
+ */
def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
inner.filterNot(predicate(_, _))
@@ -80,12 +78,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*
* @param predicate a filter that is applied to each record
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains only those records that do
<em>not</em> satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
- */
- def filterNot(predicate: (K, V) => Boolean,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ */
+ def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V,
ByteArrayKeyValueStore]): KTable[K, V] =
inner.filterNot(predicate.asPredicate, materialized)
/**
@@ -97,7 +94,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param mapper, a function `V => VR` that computes a new output value
* @return a [[KTable]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
- */
+ */
def mapValues[VR](mapper: V => VR): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapper)
@@ -109,12 +106,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*
* @param mapper, a function `V => VR` that computes a new output value
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
- */
- def mapValues[VR](mapper: V => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+ */
+ def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR,
ByteArrayKeyValueStore]): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapper, materialized)
/**
@@ -126,7 +122,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param mapper, a function `(K, V) => VR` that computes a new output value
* @return a [[KTable]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
- */
+ */
def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapperWithKey)
@@ -138,12 +134,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*
* @param mapper, a function `(K, V) => VR` that computes a new output value
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains records with unmodified key and new
values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
- */
- def mapValues[VR](mapper: (K, V) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+ */
+ def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR,
ByteArrayKeyValueStore]): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapperWithKey)
/**
@@ -165,57 +160,55 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
inner.toStream[KR](mapper.asKeyValueMapper)
/**
- * Create a new `KTable` by transforming the value of each record in this
`KTable` into a new value, (with possibly new type).
- * Transform the value of each input record into a new value (with possible
new type) of the output record.
- * A `ValueTransformerWithKey` (provided by the given
`ValueTransformerWithKeySupplier`) is applied to each input
- * record value and computes a new value for it.
- * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible,
allowing access to additional state-stores,
- * and to the `ProcessorContext`.
- * If the downstream topology uses aggregation functions, (e.g.
`KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
- * care must be taken when dealing with state, (either held in state-stores
or transformer instances), to ensure correct
- * aggregate results.
- * In contrast, if the resulting KTable is materialized, (cf.
`#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
- * such concerns are handled for you.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
- *
- * @param valueTransformerWithKeySupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
- * At least one transformer instance will
be created per streaming task.
- * Transformer implementations doe not need
to be thread-safe.
- * @param stateStoreNames the names of the state stores used by
the processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
+ * Create a new `KTable` by transforming the value of each record in this
`KTable` into a new value, (with possibly new type).
+ * Transform the value of each input record into a new value (with possible
new type) of the output record.
+ * A `ValueTransformerWithKey` (provided by the given
`ValueTransformerWithKeySupplier`) is applied to each input
+ * record value and computes a new value for it.
+ * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible,
allowing access to additional state-stores,
+ * and to the `ProcessorContext`.
+ * If the downstream topology uses aggregation functions, (e.g.
`KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
+ * care must be taken when dealing with state, (either held in state-stores
or transformer instances), to ensure correct
+ * aggregate results.
+ * In contrast, if the resulting KTable is materialized, (cf.
`#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
+ * such concerns are handled for you.
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
+ *
+ * @param valueTransformerWithKeySupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
+ * At least one transformer instance will be
created per streaming task.
+ * Transformer implementations doe not need
to be thread-safe.
+ * @param stateStoreNames the names of the state stores used by the
processor
+ * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
+ * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+ */
def transformValues[VR](valueTransformerWithKeySupplier:
ValueTransformerWithKeySupplier[K, V, VR],
- stateStoreNames: String*): KTable[K, VR] = {
+ stateStoreNames: String*): KTable[K, VR] =
inner.transformValues[VR](valueTransformerWithKeySupplier,
stateStoreNames: _*)
- }
/**
- * Create a new `KTable` by transforming the value of each record in this
`KTable` into a new value, (with possibly new type).
- * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
- * record value and computes a new value for it.
- * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible,
allowing stateful, rather than stateless,
- * record-by-record operation, access to additional state-stores, and
access to the `ProcessorContext`.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
- * The resulting `KTable` is materialized into another state store
(additional to the provided state store names)
- * as specified by the user via `Materialized` parameter, and is queryable
through its given name.
- *
- * @param valueTransformerWithKeySupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
- * At least one transformer instance will
be created per streaming task.
- * Transformer implementations doe not need
to be thread-safe.
- * @param materialized an instance of `Materialized` used to
describe how the state store of the
- * resulting table should be materialized.
- * @param stateStoreNames the names of the state stores used by
the processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
+ * Create a new `KTable` by transforming the value of each record in this
`KTable` into a new value, (with possibly new type).
+ * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
+ * record value and computes a new value for it.
+ * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible,
allowing stateful, rather than stateless,
+ * record-by-record operation, access to additional state-stores, and access
to the `ProcessorContext`.
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
+ * The resulting `KTable` is materialized into another state store
(additional to the provided state store names)
+ * as specified by the user via `Materialized` parameter, and is queryable
through its given name.
+ *
+ * @param valueTransformerWithKeySupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+ * At least one transformer instance will be
created per streaming task.
+ * Transformer implementations doe not need
to be thread-safe.
+ * @param materialized an instance of `Materialized` used to
describe how the state store of the
+ * resulting table should be materialized.
+ * @param stateStoreNames the names of the state stores used by the
processor
+ * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
+ * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+ */
def transformValues[VR](valueTransformerWithKeySupplier:
ValueTransformerWithKeySupplier[K, V, VR],
materialized: Materialized[K, VR,
KeyValueStore[Bytes, Array[Byte]]],
- stateStoreNames: String*): KTable[K, VR] = {
+ stateStoreNames: String*): KTable[K, VR] =
inner.transformValues[VR](valueTransformerWithKeySupplier, materialized,
stateStoreNames: _*)
- }
/**
* Re-groups the records of this [[KTable]] using the provided key/value
mapper
@@ -247,7 +240,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param other the other [[KTable]] to be joined with this [[KTable]]
* @param joiner a function that computes the join result for a pair of
matching records
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains join-records for each key and values
computed by the given joiner,
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#join`
@@ -276,7 +269,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param other the other [[KTable]] to be joined with this [[KTable]]
* @param joiner a function that computes the join result for a pair of
matching records
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains join-records for each key and values
computed by the given joiner,
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
@@ -305,11 +298,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param other the other [[KTable]] to be joined with this [[KTable]]
* @param joiner a function that computes the join result for a pair of
matching records
* @param materialized a `Materialized` that describes how the `StateStore`
for the resulting [[KTable]]
- * should be materialized.
+ * should be materialized.
* @return a [[KTable]] that contains join-records for each key and values
computed by the given joiner,
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
- */
+ */
def outerJoin[VO, VR](other: KTable[K, VO])(
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
@@ -323,4 +316,3 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def queryableStoreName: String = inner.queryableStoreName
}
-
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index ed41973..a602767 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -46,8 +46,7 @@ class SessionWindowedKStream[K, V](val inner:
SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
*/
- def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
- merger: (K, VR, VR) => VR)(
+ def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger:
(K, VR, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArraySessionStore]
): KTable[Windowed[K], VR] =
inner.aggregate((() => initializer).asInitializer,
aggregator.asAggregator, merger.asMerger, materialized)
@@ -55,7 +54,7 @@ class SessionWindowedKStream[K, V](val inner:
SessionWindowedKStreamJ[K, V]) {
/**
* Count the number of records in this stream by the grouped key into
`SessionWindows`.
*
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a windowed [[KTable]] that contains "update" records with
unmodified keys and `Long` values
* that represent the latest (rolling) count (i.e., number of records) for
each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
@@ -69,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner:
SessionWindowedKStreamJ[K, V]) {
/**
* Combine values of this stream by the grouped key into {@link
SessionWindows}.
*
- * @param reducer a reducer function that computes a new aggregate
result.
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param reducer a reducer function that computes a new aggregate
result.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a windowed [[KTable]] that contains "update" records with
unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 9e31ab9..9be5794 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -53,11 +53,11 @@ class TimeWindowedKStream[K, V](val inner:
TimeWindowedKStreamJ[K, V]) {
/**
* Count the number of records in this stream by the grouped key and the
defined windows.
*
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a [[KTable]] that contains "update" records with unmodified keys
and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each
key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
- */
+ */
def count()(implicit materialized: Materialized[K, Long,
ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
val c: KTable[Windowed[K], java.lang.Long] =
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long,
ByteArrayWindowStore]])
@@ -68,7 +68,7 @@ class TimeWindowedKStream[K, V](val inner:
TimeWindowedKStreamJ[K, V]) {
* Combine the values of records in this stream by the grouped key.
*
* @param reducer a function that computes a new aggregate result
- * @param materialized an instance of `Materialized` used to materialize a
state store.
+ * @param materialized an instance of `Materialized` used to materialize a
state store.
* @return a [[KTable]] that contains "update" records with unmodified keys,
and values that represent the
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 02d1dab..7891131 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -40,9 +40,8 @@ import org.scalatest.junit.JUnitSuite
* <p>
* Note: In the current project settings SAM type conversion is turned off as
it's experimental in Scala 2.11.
* Hence the native Java API based version is more verbose.
- */
-class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
- with StreamToTableJoinTestData {
+ */
+class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
with StreamToTableJoinTestData {
private val privateCluster: EmbeddedKafkaCluster = new
EmbeddedKafkaCluster(1)
@@ -67,7 +66,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
@Test def testShouldCountClicksPerRegion(): Unit = {
- // DefaultSerdes brings into scope implicit serdes (mostly for primitives)
that will set up all Serialized, Produced,
+ // DefaultSerdes brings into scope implicit serdes (mostly for primitives)
that will set up all Serialized, Produced,
// Consumed and Joined instances. So all APIs below that accept
Serialized, Produced, Consumed or Joined will
// get these instances automatically
import Serdes._
@@ -84,7 +83,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
val clicksPerRegion: KTable[String, Long] =
userClicksStream
- // Join the stream against the table.
+ // Join the stream against the table.
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null)
"UNKNOWN" else region, clicks))
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
@@ -100,8 +99,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
val streams: KafkaStreams = new KafkaStreams(builder.build(),
streamsConfiguration)
streams.start()
-
- val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
streams.close()
@@ -126,29 +124,32 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
val builder: StreamsBuilderJ = new StreamsBuilderJ()
- val userClicksStream: KStreamJ[String, JLong] =
+ val userClicksStream: KStreamJ[String, JLong] =
builder.stream[String, JLong](userClicksTopicJ,
Consumed.`with`(Serdes.String, Serdes.JavaLong))
- val userRegionsTable: KTableJ[String, String] =
+ val userRegionsTable: KTableJ[String, String] =
builder.table[String, String](userRegionsTopicJ,
Consumed.`with`(Serdes.String, Serdes.String))
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] =
userClicksStream
- .leftJoin(userRegionsTable,
+ .leftJoin(
+ userRegionsTable,
new ValueJoiner[JLong, String, (String, JLong)] {
- def apply(clicks: JLong, region: String): (String, JLong) =
+ def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
- },
- Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String))
+ },
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String)
+ )
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
- val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
- .map {
+ val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
+ .map {
new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
- def apply(k: String, regionWithClicks: (String, JLong)) = new
KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+ def apply(k: String, regionWithClicks: (String, JLong)) =
+ new KeyValue[String, JLong](regionWithClicks._1,
regionWithClicks._2)
}
}
-
+
// Compute the total per region by summing the individual click counts per
region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
.groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
@@ -157,7 +158,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
def apply(v1: JLong, v2: JLong) = v1 + v2
}
}
-
+
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String,
Serdes.JavaLong))
@@ -165,7 +166,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
streams.start()
- val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
streams.close()
@@ -214,17 +215,27 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
p
}
- private def produceNConsume(userClicksTopic: String, userRegionsTopic:
String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
+ private def produceNConsume(userClicksTopic: String,
+ userRegionsTopic: String,
+ outputTopic: String):
java.util.List[KeyValue[String, Long]] = {
import collection.JavaConverters._
-
+
// Publish user-region information.
val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
- IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
userRegions.asJava, userRegionsProducerConfig, mockTime, false)
+ IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
+ userRegions.asJava,
+
userRegionsProducerConfig,
+ mockTime,
+ false)
// Publish user-click information.
val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
- IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
userClicks.asJava, userClicksProducerConfig, mockTime, false)
+ IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
+ userClicks.asJava,
+
userClicksProducerConfig,
+ mockTime,
+ false)
// consume and verify result
val consumerConfig = getConsumerConfig()
@@ -232,4 +243,3 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedClicksPerRegion.size)
}
}
-
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
index 45715a7..e9040ee 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
@@ -58,4 +58,3 @@ trait StreamToTableJoinTestData {
new KeyValue("asia", 124L)
)
}
-
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 3fc890c..8a0eabb 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -37,7 +37,7 @@ import collection.JavaConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs
match.
- */
+ */
class TopologyTest extends JUnitSuite {
val inputTopic = "input-topic"
@@ -52,22 +52,22 @@ class TopologyTest extends JUnitSuite {
def getTopologyScala(): TopologyDescription = {
import Serdes._
-
+
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
-
+
val _: KStream[String, String] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
-
+
streamBuilder.build().describe()
}
-
+
// build the Java topology
def getTopologyJava(): TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines = streamBuilder.stream[String, String](inputTopic)
-
+
val _: KStreamJ[String, String] = textLines.flatMapValues {
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] =
pattern.split(s.toLowerCase).toIterable.asJava
@@ -86,15 +86,16 @@ class TopologyTest extends JUnitSuite {
def getTopologyScala(): TopologyDescription = {
import Serdes._
-
+
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
-
+
val _: KTable[String, Long] =
- textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+ textLines
+ .flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()
-
+
streamBuilder.build().describe()
}
@@ -103,21 +104,21 @@ class TopologyTest extends JUnitSuite {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String,
String](inputTopic)
-
+
val splits: KStreamJ[String, String] = textLines.flatMapValues {
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] =
pattern.split(s.toLowerCase).toIterable.asJava
}
}
-
+
val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
new KeyValueMapper[String, String, String] {
def apply(k: String, v: String): String = v
}
}
-
+
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
-
+
streamBuilder.build().describe()
}
@@ -130,13 +131,13 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
import Serdes._
-
+
val builder = new StreamsBuilder()
-
+
val userClicksStream: KStream[String, Long] =
builder.stream(userClicksTopic)
-
+
val userRegionsTable: KTable[String, String] =
builder.table(userRegionsTopic)
-
+
val clicksPerRegion: KTable[String, Long] =
userClicksStream
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null)
"UNKNOWN" else region, clicks))
@@ -151,32 +152,35 @@ class TopologyTest extends JUnitSuite {
def getTopologyJava(): TopologyDescription = {
import java.lang.{Long => JLong}
-
+
val builder: StreamsBuilderJ = new StreamsBuilderJ()
-
- val userClicksStream: KStreamJ[String, JLong] =
+
+ val userClicksStream: KStreamJ[String, JLong] =
builder.stream[String, JLong](userClicksTopic,
Consumed.`with`(Serdes.String, Serdes.JavaLong))
-
- val userRegionsTable: KTableJ[String, String] =
+
+ val userRegionsTable: KTableJ[String, String] =
builder.table[String, String](userRegionsTopic,
Consumed.`with`(Serdes.String, Serdes.String))
-
+
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] =
userClicksStream
- .leftJoin(userRegionsTable,
+ .leftJoin(
+ userRegionsTable,
new ValueJoiner[JLong, String, (String, JLong)] {
- def apply(clicks: JLong, region: String): (String, JLong) =
+ def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
- },
- Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String))
-
+ },
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String)
+ )
+
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
- val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
- .map {
+ val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
+ .map {
new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]]
{
- def apply(k: String, regionWithClicks: (String, JLong)) = new
KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+ def apply(k: String, regionWithClicks: (String, JLong)) =
+ new KeyValue[String, JLong](regionWithClicks._1,
regionWithClicks._2)
}
}
-
+
// Compute the total per region by summing the individual click counts
per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
.groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 5abc1bc..5d858d8 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -50,7 +50,7 @@ import ImplicitConversions._
* <p>
* Note: In the current project settings SAM type conversion is turned off as
it's experimental in Scala 2.11.
* Hence the native Java API based version is more verbose.
- */
+ */
class WordCountTest extends JUnitSuite with WordCountTestData {
private val privateCluster: EmbeddedKafkaCluster = new
EmbeddedKafkaCluster(1)
@@ -61,11 +61,8 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
val mockTime: MockTime = cluster.time
mockTime.setCurrentTimeMs(alignedTime)
-
val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
@Rule def testFolder: TemporaryFolder = tFolder
-
-
@Before
def startKafkaCluster(): Unit = {
cluster.createTopic(inputTopic)
@@ -86,7 +83,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData
{
// generate word counts
val wordCounts: KTable[String, Long] =
- textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+ textLines
+ .flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((_, v) => v)
.count()
@@ -117,7 +115,8 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
// generate word counts
val wordCounts: KTable[String, Long] =
- textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+ textLines
+ .flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()(Materialized.as("word-count"))
@@ -139,7 +138,12 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
@Test def testShouldCountWordsJava(): Unit = {
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ,
StreamsBuilder => StreamsBuilderJ}
- import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream =>
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+ import org.apache.kafka.streams.kstream.{
+ KTable => KTableJ,
+ KStream => KStreamJ,
+ KGroupedStream => KGroupedStreamJ,
+ _
+ }
import collection.JavaConverters._
val streamsConfiguration = getStreamsConfiguration()
@@ -250,4 +254,3 @@ trait WordCountTestData {
new KeyValue("слова", 1L)
)
}
-