This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b88d70b MINOR: Make Serdes less confusing in Scala (#4963)
b88d70b is described below
commit b88d70b53290af715034a1f772a271f7e44505fd
Author: Joan Goyeau <[email protected]>
AuthorDate: Tue May 8 17:15:31 2018 +0100
MINOR: Make Serdes less confusing in Scala (#4963)
Serdes are confusing in the Scala wrapper:
* We have wrappers around Serializer, Deserializer and Serde which are not
very useful.
* We have Serdes in 2 places org.apache.kafka.common.serialization.Serde
and in DefaultSerdes, instead we should be having only one place where to find
all the Serdes.
I wanted to do this PR before the release as this is a breaking change.
This shouldn't add more so the current tests should be enough.
Reviewers: Debasish Ghosh <[email protected]>, Guozhang Wang
<[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 11 ++--
docs/streams/index.html | 2 +-
.../apache/kafka/streams/scala/DefaultSerdes.scala | 47 --------------
.../apache/kafka/streams/scala/ScalaSerde.scala | 70 ---------------------
.../org/apache/kafka/streams/scala/Serdes.scala | 71 ++++++++++++++++++++++
.../kafka/streams/scala/StreamsBuilder.scala | 4 +-
.../kafka/streams/scala/kstream/KStream.scala | 12 ++--
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++---
.../apache/kafka/streams/scala/TopologyTest.scala | 14 ++---
.../apache/kafka/streams/scala/WordCountTest.scala | 10 +--
10 files changed, 105 insertions(+), 152 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index 2b25072..687dff9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3165,8 +3165,7 @@ t=5 (blue), which lead to a merge of sessions and an
extension of a session, res
<p>The library also has several utility abstractions and modules
that the user needs to use for proper semantics.</p>
<ul>
<li><code class="docutils literal"><span
class="pre">org.apache.kafka.streams.scala.ImplicitConversions</span></code>:
Module that brings into scope the implicit conversions between the Scala and
Java classes.</li>
- <li><code class="docutils literal"><span
class="pre">org.apache.kafka.streams.scala.DefaultSerdes</span></code>: Module
that brings into scope the implicit values of all primitive SerDes.</li>
- <li><code class="docutils literal"><span
class="pre">org.apache.kafka.streams.scala.ScalaSerde</span></code>: Base
abstraction that can be used to implement custom SerDes in a type safe way.</li>
+ <li><code class="docutils literal"><span
class="pre">org.apache.kafka.streams.scala.Serdes</span></code>: Module that
contains all primitive SerDes that can be imported as implicits and a helper to
create custom SerDes.</li>
</ul>
<p>The library is cross-built with Scala 2.11 and 2.12. To
reference the library compiled against Scala 2.11 include the following in your
maven <code>pom.xml</code> add the following:</p>
<pre class="brush: xml;">
@@ -3197,7 +3196,7 @@ import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
- import DefaultSerdes._
+ import Serdes._
val config: Properties = {
val p = new Properties()
@@ -3235,7 +3234,7 @@ object WordCountApplication extends App {
// that will set up all Serialized, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will
// get these instances automatically
-import DefaultSerdes._
+import Serdes._
val builder = new StreamsBuilder()
@@ -3260,7 +3259,7 @@ clicksPerRegion.toStream.to(outputTopic)
<p>Quite a few things are going on in the above code snippet
that may warrant a few lines of elaboration:</p>
<ol>
<li>The code snippet does not depend on any config defined
SerDes. In fact any SerDes defined as part of the config will be ignored.</li>
- <li>All SerDes are picked up from the implicits in scope. And
<code class="docutils literal"><span class="pre">import
DefaultSerdes._</span></code> brings all necessary SerDes in scope.</li>
+ <li>All SerDes are picked up from the implicits in scope. And
<code class="docutils literal"><span class="pre">import Serdes._</span></code>
brings all necessary SerDes in scope.</li>
<li>This is an example of compile time type safety that we
don't have in the Java APIs.</li>
<li>The code looks less verbose and more focused towards the
actual transformation that it does on the data stream.</li>
</ol>
@@ -3277,7 +3276,7 @@ case class UserClicks(clicks: Long)
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
// Primitive SerDes
-import DefaultSerdes._
+import Serdes._
// And then business as usual ..
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 72e1323..6dfaf6b 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -261,7 +261,7 @@ import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
- import DefaultSerdes._
+ import Serdes._
val config: Properties = {
val p = new Properties()
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
deleted file mode 100644
index 3f2840e..0000000
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.nio.ByteBuffer
-
-import org.apache.kafka.common.serialization.{Serde, Serdes}
-import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.streams.kstream.WindowedSerdes
-
-
-/**
- * Implicit values for default serdes.
- * <p>
- * Bring them in scope for default serializers / de-serializers to work.
- */
-object DefaultSerdes {
- implicit val stringSerde: Serde[String] = Serdes.String()
- implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
- implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
- implicit val bytesSerde: Serde[Bytes] = Serdes.Bytes()
- implicit val floatSerde: Serde[Float] =
Serdes.Float().asInstanceOf[Serde[Float]]
- implicit val doubleSerde: Serde[Double] =
Serdes.Double().asInstanceOf[Serde[Double]]
- implicit val integerSerde: Serde[Int] =
Serdes.Integer().asInstanceOf[Serde[Int]]
- implicit val shortSerde: Serde[Short] =
Serdes.Short().asInstanceOf[Serde[Short]]
- implicit val byteBufferSerde: Serde[ByteBuffer] = Serdes.ByteBuffer()
-
- implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new
WindowedSerdes.TimeWindowedSerde[T]()
- implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T]
= new WindowedSerdes.SessionWindowedSerde[T]()
-}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
deleted file mode 100644
index 06afcae..0000000
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import org.apache.kafka.common.serialization.{Serde, Deserializer =>
JDeserializer, Serializer => JSerializer}
-
-trait ScalaSerde[T] extends Serde[T] {
- override def deserializer(): JDeserializer[T]
-
- override def serializer(): JSerializer[T]
-
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = ()
-
- override def close(): Unit = ()
-}
-
-trait SimpleScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] {
- def serialize(data: T): Array[Byte]
- def deserialize(data: Array[Byte]): Option[T]
-
- private def outerSerialize(data: T): Array[Byte] = serialize(data)
- private def outerDeserialize(data: Array[Byte]): Option[T] =
deserialize(data)
-
- override def deserializer(): Deserializer[T] = new Deserializer[T] {
- override def deserialize(data: Array[Byte]): Option[T] =
outerDeserialize(data)
- }
-
- override def serializer(): Serializer[T] = new Serializer[T] {
- override def serialize(data: T): Array[Byte] = outerSerialize(data)
- }
-}
-
-trait Deserializer[T >: Null] extends JDeserializer[T] {
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = ()
-
- override def close(): Unit = ()
-
- override def deserialize(topic: String, data: Array[Byte]): T =
- Option(data).flatMap(deserialize).orNull
-
- def deserialize(data: Array[Byte]): Option[T]
-}
-
-trait Serializer[T] extends JSerializer[T] {
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = ()
-
- override def close(): Unit = ()
-
- override def serialize(topic: String, data: T): Array[Byte] =
- Option(data).map(serialize).orNull
-
- def serialize(data: T): Array[Byte]
-}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
new file mode 100644
index 0000000..a0ffffa
--- /dev/null
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer,
Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes {
+ implicit val String: Serde[String] =
JSerdes.String()
+ implicit val Long: Serde[Long] =
JSerdes.Long().asInstanceOf[Serde[Long]]
+ implicit val JavaLong: Serde[java.lang.Long] =
JSerdes.Long()
+ implicit val ByteArray: Serde[Array[Byte]] =
JSerdes.ByteArray()
+ implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] =
JSerdes.Bytes()
+ implicit val Float: Serde[Float] =
JSerdes.Float().asInstanceOf[Serde[Float]]
+ implicit val JavaFloat: Serde[java.lang.Float] =
JSerdes.Float()
+ implicit val Double: Serde[Double] =
JSerdes.Double().asInstanceOf[Serde[Double]]
+ implicit val JavaDouble: Serde[java.lang.Double] =
JSerdes.Double()
+ implicit val Integer: Serde[Int] =
JSerdes.Integer().asInstanceOf[Serde[Int]]
+ implicit val JavaInteger: Serde[java.lang.Integer] =
JSerdes.Integer()
+
+ implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new
WindowedSerdes.TimeWindowedSerde[T]()
+ implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T]
= new WindowedSerdes.SessionWindowedSerde[T]()
+
+ def fromFn[T >: Null](serializer: T => Array[Byte], deserializer:
Array[Byte] => Option[T]): Serde[T] =
+ JSerdes.serdeFrom(
+ new Serializer[T] {
+ override def serialize(topic: String, data: T): Array[Byte]
= serializer(data)
+ override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
+ override def close(): Unit
= ()
+ },
+ new Deserializer[T] {
+ override def deserialize(topic: String, data: Array[Byte]): T
= deserializer(data).orNull
+ override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
+ override def close(): Unit
= ()
+ }
+ )
+
+ def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
+ deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+ JSerdes.serdeFrom(
+ new Serializer[T] {
+ override def serialize(topic: String, data: T): Array[Byte]
= serializer(topic, data)
+ override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
+ override def close(): Unit
= ()
+ },
+ new Deserializer[T] {
+ override def deserialize(topic: String, data: Array[Byte]): T
= deserializer(topic, data).orNull
+ override def configure(configs: util.Map[String, _], isKey: Boolean):
Unit = ()
+ override def close(): Unit
= ()
+ }
+ )
+}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 9e6e204..397af32 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -48,7 +48,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
* import ImplicitConversions._
*
* // Bring implicit default serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val builder = new StreamsBuilder()
*
@@ -98,7 +98,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
* import ImplicitConversions._
*
* // Bring implicit default serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val builder = new StreamsBuilder()
*
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 7634b95..e3e8470 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -206,7 +206,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
@@ -238,7 +238,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
@@ -354,7 +354,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val clicksPerRegion: KTable[String, Long] =
* userClicksStream
@@ -362,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* .map((_, regionWithClicks) => regionWithClicks)
*
* // the groupByKey gets the Serialized instance through an implicit
conversion of the
- * // serdes brought into scope through the import DefaultSerdes._ above
+ * // serdes brought into scope through the import Serdes._ above
* .groupByKey
* .reduce(_ + _)
*
@@ -388,7 +388,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val textLines = streamBuilder.stream[String, String](inputTopic)
*
@@ -398,7 +398,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* textLines.flatMapValues(v => pattern.split(v.toLowerCase))
*
* // the groupBy gets the Serialized instance through an implicit
conversion of the
- * // serdes brought into scope through the import DefaultSerdes._ above
+ * // serdes brought into scope through the import Serdes._ above
* .groupBy((k, v) => v)
*
* .count()
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e701431..113458e 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -73,7 +73,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
// DefaultSerdes brings into scope implicit serdes (mostly for primitives)
that will set up all Serialized, Produced,
// Consumed and Joined instances. So all APIs below that accept
Serialized, Produced, Consumed or Joined will
// get these instances automatically
- import DefaultSerdes._
+ import Serdes._
val streamsConfiguration: Properties = getStreamsConfiguration()
@@ -122,16 +122,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
val streamsConfiguration: Properties = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName())
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName())
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopicJ,
Consumed.`with`(Serdes.String(), Serdes.Long()))
+ builder.stream[String, JLong](userClicksTopicJ,
Consumed.`with`(Serdes.String, Serdes.JavaLong))
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopicJ,
Consumed.`with`(Serdes.String(), Serdes.String()))
+ builder.table[String, String](userRegionsTopicJ,
Consumed.`with`(Serdes.String, Serdes.String))
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] =
userClicksStream
@@ -140,7 +140,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
},
- Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(),
Serdes.String()))
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String))
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -152,7 +152,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
// Compute the total per region by summing the individual click counts per
region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+ .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
@@ -160,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes
extends JUnitSuite
}
// Write the (continuously updating) results to the output topic.
- clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(),
Serdes.Long()))
+ clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String,
Serdes.JavaLong))
val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(),
streamsConfiguration)
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 71d4834..9495ea7 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -52,7 +52,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
@@ -87,7 +87,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
@@ -132,7 +132,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
val builder = new StreamsBuilder()
@@ -158,10 +158,10 @@ class TopologyTest extends JUnitSuite {
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopic,
Consumed.`with`(Serdes.String(), Serdes.Long()))
+ builder.stream[String, JLong](userClicksTopic,
Consumed.`with`(Serdes.String, Serdes.JavaLong))
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopic,
Consumed.`with`(Serdes.String(), Serdes.String()))
+ builder.table[String, String](userRegionsTopic,
Consumed.`with`(Serdes.String, Serdes.String))
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] =
userClicksStream
@@ -170,7 +170,7 @@ class TopologyTest extends JUnitSuite {
def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
},
- Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(),
Serdes.String()))
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong,
Serdes.String))
// Change the stream from <user> -> <region, clicks> to <region> ->
<clicks>
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -182,7 +182,7 @@ class TopologyTest extends JUnitSuite {
// Compute the total per region by summing the individual click counts
per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+ .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index e827a3c..17fa35c 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -75,7 +75,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData
{
@Test def testShouldCountWords(): Unit = {
- import DefaultSerdes._
+ import Serdes._
val streamsConfiguration = getStreamsConfiguration()
@@ -112,8 +112,8 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
import collection.JavaConverters._
val streamsConfiguration = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName())
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName())
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String,
String](inputTopicJ)
@@ -134,7 +134,7 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
- wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(),
Serdes.Long()))
+ wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String,
Serdes.JavaLong))
val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(),
streamsConfiguration)
streams.start()
@@ -153,7 +153,7 @@ class WordCountTest extends JUnitSuite with
WordCountTestData {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers())
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
testFolder.getRoot().getPath())
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
testFolder.getRoot.getPath)
streamsConfiguration
}
--
To stop receiving notification emails like this one, please contact
[email protected].