This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed13d7e KAFKA-7250: fix transform function in scala DSL to accept
TranformerSupplier (#5468)
ed13d7e is described below
commit ed13d7eebb0a8c2155ac50fce16c12ec502c1f0d
Author: Michal Dziemianko <[email protected]>
AuthorDate: Tue Aug 7 16:00:22 2018 +0100
KAFKA-7250: fix transform function in scala DSL to accept
TranformerSupplier (#5468)
Reviewers: Guozhang Wang <[email protected]>
---
.../kafka/streams/scala/FunctionConversions.scala | 6 +++
.../kafka/streams/scala/kstream/KStream.scala | 32 ++++--------
.../apache/kafka/streams/scala/TopologyTest.scala | 59 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 22 deletions(-)
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 4a4c3b0..65ea490 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -105,4 +105,10 @@ object FunctionConversions {
override def apply(): VA = f()
}
}
+
+ implicit class TransformerSupplierFromFunction[K, V, VO](val f: () =>
Transformer[K, V, VO]) extends AnyVal {
+ def asTransformerSupplier: TransformerSupplier[K, V, VO] = new
TransformerSupplier[K, V, VO] {
+ override def get(): Transformer[K, V, VO] = f()
+ }
+ }
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 8f6aab8..c02939a 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext,
ProcessorSupplier, TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier,
TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@@ -284,33 +284,21 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
* Transform each record of the input stream into zero or more records in
the output stream (both key and value type
* can be altered arbitrarily).
- * A `Transformer` is applied to each input record and computes zero or more
output records. In order to assign a
- * state, the state must be created and registered beforehand via stores
added via `addStateStore` or `addGlobalStore`
+ * A `Transformer` (provided by the given `TransformerSupplier`) is applied
to each input record
+ * and computes zero or more output records.
+ * In order to assign a state, the state must be created and registered
+ * beforehand via stores added via `addStateStore` or `addGlobalStore`
* before they can be connected to the `Transformer`
*
- * @param transformer the `Transformer` instance
+ * @param transformerSupplier the `TransformerSuplier` that generates
`Transformer`
* @param stateStoreNames the names of the state stores used by the
processor
* @return a [[KStream]] that contains more or less records with new key and
value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
stateStoreNames: String*): KStream[K1, V1] = {
- val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] =
- new TransformerSupplier[K, V, KeyValue[K1, V1]] {
- override def get(): Transformer[K, V, KeyValue[K1, V1]] =
- new Transformer[K, V, KeyValue[K1, V1]] {
- override def transform(key: K, value: V): KeyValue[K1, V1] =
- transformer.transform(key, value) match {
- case (k1, v1) => KeyValue.pair(k1, v1)
- case _ => null
- }
-
- override def init(context: ProcessorContext): Unit =
transformer.init(context)
-
- override def close(): Unit = transformer.close()
- }
- }
- inner.transform(transformerSupplierJ, stateStoreNames: _*)
- }
+ def transform[K1, V1](transformerSupplier: () => Transformer[K, V,
KeyValue[K1, V1]],
+ stateStoreNames: String*): KStream[K1, V1] =
+ inner.transform(transformerSupplier.asTransformerSupplier,
stateStoreNames: _*)
+
/**
* Transform the value of each input record into a new value (with possible
new type) of the output record.
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index f04ec5d..194abf5 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -31,6 +31,8 @@ import ImplicitConversions._
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream =>
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+
import collection.JavaConverters._
/**
@@ -194,4 +196,61 @@ class TopologyTest extends JUnitSuite {
// should match
assertEquals(getTopologyScala(), getTopologyJava())
}
+
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
+
+ // build the Scala topology
+ def getTopologyScala(): TopologyDescription = {
+
+ import Serdes._
+
+ val streamBuilder = new StreamsBuilder
+ val textLines = streamBuilder.stream[String, String](inputTopic)
+
+ val _: KTable[String, Long] =
+ textLines
+ .transform(() => new Transformer[String, String, KeyValue[String,
String]] {
+ override def init(context: ProcessorContext): Unit = Unit
+ override def transform(key: String, value: String):
KeyValue[String, String] =
+ new KeyValue(key, value.toLowerCase)
+ override def close(): Unit = Unit
+ })
+ .groupBy((k, v) => v)
+ .count()
+
+ streamBuilder.build().describe()
+ }
+
+ // build the Java topology
+ def getTopologyJava(): TopologyDescription = {
+
+ val streamBuilder = new StreamsBuilderJ
+ val textLines: KStreamJ[String, String] = streamBuilder.stream[String,
String](inputTopic)
+
+ val lowered: KStreamJ[String, String] = textLines
+ .transform(new TransformerSupplier[String, String, KeyValue[String,
String]] {
+ override def get(): Transformer[String, String, KeyValue[String,
String]] = new Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = Unit
+
+ override def transform(key: String, value: String): KeyValue[String,
String] =
+ new KeyValue(key, value.toLowerCase)
+
+ override def close(): Unit = Unit
+ }
+ })
+
+ val grouped: KGroupedStreamJ[String, String] = lowered.groupBy {
+ new KeyValueMapper[String, String, String] {
+ def apply(k: String, v: String): String = v
+ }
+ }
+
+ val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+
+ streamBuilder.build().describe()
+ }
+
+ // should match
+ assertEquals(getTopologyScala(), getTopologyJava())
+ }
}