This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b1539ff KAFKA-7250: switch scala transform to TransformSupplier
(#5481)
b1539ff is described below
commit b1539ff62dd3e8eeffeae526a2cbd7e4cb63ecda
Author: John Roesler <[email protected]>
AuthorDate: Thu Aug 9 12:11:48 2018 -0500
KAFKA-7250: switch scala transform to TransformSupplier (#5481)
#5468 introduced a breaking API change that was actually avoidable. This PR
re-introduces the old API as deprecated and alters the API introduced by #5468
to be consistent with the other methods
also, fixed misc syntax problems
---
build.gradle | 1 +
.../kafka/streams/scala/kstream/KStream.scala | 30 ++++++++++++----------
.../apache/kafka/streams/scala/TopologyTest.scala | 28 ++++++++++----------
3 files changed, 31 insertions(+), 28 deletions(-)
diff --git a/build.gradle b/build.gradle
index 83b169b..7f5a1d9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1015,6 +1015,7 @@ project(':streams:streams-scala') {
testCompile libs.junit
testCompile libs.scalatest
+ testCompile libs.easymock
testRuntime libs.slf4jlog4j
}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index a8766bd..adc1850 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier,
TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext,
ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
/**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and
delegates method calls to the underlying Java object.
*
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
* @param inner The underlying Java abstraction for KStream
*
* @see `org.apache.kafka.streams.kstream.KStream`
@@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def print(printed: Printed[K, V]): Unit = inner.print(printed)
/**
- * Perform an action on each record of 'KStream`
+ * Perform an action on each record of `KStream`
*
* @param action an action to perform on each record
* @see `org.apache.kafka.streams.kstream.KStream#foreach`
@@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.foreach((k: K, v: V) => action(k, v))
/**
- * Creates an array of {@code KStream} from this stream by branching the
records in the original stream based on
+ * Creates an array of `KStream` from this stream by branching the records
in the original stream based on
* the supplied predicates.
*
* @param predicates the ordered list of functions that return a Boolean
* @return multiple distinct substreams of this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#branch`
*/
- def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
+ //noinspection ScalaUnnecessaryParentheses
+ def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
inner.branch(predicates.map(_.asPredicate): _*).map(kstream =>
wrapKStream(kstream))
/**
@@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param topic the topic name
- * @param (implicit) produced the instance of Produced that gives the serdes
and `StreamPartitioner`
+ * @param produced the instance of Produced that gives the serdes and
`StreamPartitioner`
* @return a [[KStream]] that contains the exact same (and potentially
repartitioned) records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#through`
*/
@@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param topic the topic name
- * @param (implicit) produced the instance of Produced that gives the serdes
and `StreamPartitioner`
+ * @param produced the instance of Produced that gives the serdes and
`StreamPartitioner`
* @see `org.apache.kafka.streams.kstream.KStream#to`
*/
def to(topic: String)(implicit produced: Produced[K, V]): Unit =
@@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param extractor the extractor to determine the name of the Kafka topic
to write to for reach record
- * @param (implicit) produced the instance of Produced that gives the serdes
and `StreamPartitioner`
+ * @param produced the instance of Produced that gives the serdes and
`StreamPartitioner`
* @see `org.apache.kafka.streams.kstream.KStream#to`
*/
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K,
V]): Unit =
@@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and
value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def transform[K1, V1](transformerSupplier: () => Transformer[K, V,
KeyValue[K1, V1]],
+ def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V,
KeyValue[K1, V1]],
stateStoreNames: String*): KStream[K1, V1] =
- inner.transform(transformerSupplier.asTransformerSupplier,
stateStoreNames: _*)
+ inner.transform(transformerSupplier, stateStoreNames: _*)
/**
* Transform the value of each input record into a new value (with possible
new type) of the output record.
@@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* In order to assign a state, the state must be created and registered
* beforehand via stores added via `addStateStore` or `addGlobalStore`
before they can be connected to the `Transformer`
*
- * @param processorSupplier a function that generates a
[[org.apache.kafka.stream.Processor]]
+ * @param processorSupplier a function that generates a
[[org.apache.kafka.streams.processor.Processor]]
* @param stateStoreNames the names of the state store used by the
processor
* @see `org.apache.kafka.streams.kstream.KStream#process`
*/
def process(processorSupplier: () => Processor[K, V], stateStoreNames:
String*): Unit = {
+ //noinspection ConvertExpressionToSAM // because of the 2.11 build
val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K,
V] {
override def get(): Processor[K, V] = processorSupplier()
}
@@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* // to the groupByKey call
* }}}
*
- * @param (implicit) serialized the instance of Serialized that gives the
serdes
+ * @param serialized the instance of Serialized that gives the serdes
* @return a [[KGroupedStream]] that contains the grouped records of the
original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
*/
@@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner)
/**
- * Perform an action on each record of {@code KStream}.
+ * Perform an action on each record of `KStream`.
* <p>
* Peek is a non-terminal operation that triggers a side effect (such as
logging or statistics collection)
* and returns an unchanged stream.
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 8a0eabb..b596dd3 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala
import java.util.regex.Pattern
-import org.scalatest.junit.JUnitSuite
-import org.junit.Assert._
-import org.junit._
-
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ,
KStream => KStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
-
-import ImplicitConversions._
-
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
-import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream =>
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
-import org.apache.kafka.streams.processor.ProcessorContext
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
-import collection.JavaConverters._
+import _root_.scala.collection.JavaConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs
match.
@@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite {
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
+ //noinspection ConvertExpressionToSAM due to 2.11 build
val _: KTable[String, Long] =
textLines
- .transform(
- () =>
+ .transform(new TransformerSupplier[String, String, KeyValue[String,
String]] {
+ override def get(): Transformer[String, String, KeyValue[String,
String]] =
new Transformer[String, String, KeyValue[String, String]] {
override def init(context: ProcessorContext): Unit = Unit
+
override def transform(key: String, value: String):
KeyValue[String, String] =
new KeyValue(key, value.toLowerCase)
+
override def close(): Unit = Unit
- }
- )
+ }
+ })
.groupBy((k, v) => v)
.count()