This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 893e044 MINOR: Build and code sample updates for Kafka Streams DSL
for Scala (#4949)
893e044 is described below
commit 893e0445150614d3538654af0e25f78d87a717ba
Author: Sean Glover <[email protected]>
AuthorDate: Sun May 6 23:55:12 2018 -0400
MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949)
Several build and documentation updates were required after the merge of
KAFKA-6670: Implement a Scala wrapper library for Kafka Streams.
Encode Scala major version into streams-scala artifacts.
To differentiate versions of the kafka-streams-scala artifact across Scala
major versions it's required to encode the version into the artifact name
before its published to a maven repository. This is accomplished by following a
similar release process as kafka core, which encodes the Scala major version
and then runs the build for each major version of Scala supported. This is
considered standard practice when releasing Scala libraries, but is not handled
for us automatically with th [...]
After this change you can generate and install the kafka-streams-scala
artifact into the local maven repository:
$ ./gradlew -PscalaVersion=2.11 install
$ ./gradlew -PscalaVersion=2.12 install
Reviewers: Ismael Juma <[email protected]>, Guozhang Wang
<[email protected]>
---
build.gradle | 3 +--
docs/streams/developer-guide/dsl-api.html | 14 ++++++++------
docs/streams/index.html | 7 ++++---
...reamToTableJoinScalaIntegrationTestImplicitSerdes.scala | 3 +--
.../org/apache/kafka/streams/scala/TopologyTest.scala | 3 +--
.../org/apache/kafka/streams/scala/WordCountTest.scala | 3 +--
6 files changed, 16 insertions(+), 17 deletions(-)
diff --git a/build.gradle b/build.gradle
index d60ca8f..31026d9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -981,7 +981,7 @@ project(':streams') {
project(':streams:streams-scala') {
println "Building project 'streams-scala' with Scala version
${versions.scala}"
apply plugin: 'scala'
- archivesBaseName = "kafka-streams-scala"
+ archivesBaseName = "kafka-streams-scala_${versions.baseScala}"
dependencies {
compile project(':streams')
@@ -992,7 +992,6 @@ project(':streams:streams-scala') {
testCompile project(':core').sourceSets.test.output
testCompile project(':streams').sourceSets.test.output
testCompile project(':clients').sourceSets.test.output
- testCompile libs.scalaLogging
testCompile libs.junit
testCompile libs.scalatest
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index ce60654..2b25072 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3191,11 +3191,13 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala._
+import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
import DefaultSerdes._
- import ImplicitConversions._
val config: Properties = {
val p = new Properties()
@@ -3204,9 +3206,9 @@ object WordCountApplication extends App {
p
}
- val builder = new StreamsBuilder()
- val textLines = builder.stream[String, String]("TextLinesTopic")
- val wordCounts = textLines
+ val builder: StreamsBuilder = new StreamsBuilder
+ val textLines: KStream[String, String] = builder.stream[String,
String]("TextLinesTopic")
+ val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count(Materialized.as("counts-store"))
@@ -3216,7 +3218,7 @@ object WordCountApplication extends App {
streams.start()
sys.ShutdownHookThread {
- streams.close(10, TimeUnit.SECONDS)
+ streams.close(10, TimeUnit.SECONDS)
}
}
</pre>
@@ -3290,7 +3292,7 @@ val clicksPerRegion: KTable[String, Long] =
// Join the stream against the table.
.leftJoin(userRegionsTable, (clicks: UserClicks, region: String) => (if
(region == null) "UNKNOWN" else region, clicks.clicks))
- // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+ // Change the stream from <user> -> <region, clicks> to
<region> -> <clicks>
.map((_, regionWithClicks) => regionWithClicks)
// Compute the total per region by summing the individual click counts per
region.
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 8992fc5..72e1323 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -255,12 +255,13 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
import DefaultSerdes._
- import ImplicitConversions._
val config: Properties = {
val p = new Properties()
@@ -269,7 +270,7 @@ object WordCountApplication extends App {
p
}
- val builder: StreamsBuilder = new StreamsBuilder()
+ val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String,
String]("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
@@ -281,7 +282,7 @@ object WordCountApplication extends App {
streams.start()
sys.ShutdownHookThread {
- streams.close(10, TimeUnit.SECONDS)
+ streams.close(10, TimeUnit.SECONDS)
}
}
</pre>
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 24974c4..e701431 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.streams._
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
/**
* Test suite that does an example to demonstrate stream-table joins in Kafka
Streams
@@ -46,7 +45,7 @@ import com.typesafe.scalalogging.LazyLogging
* Hence the native Java API based version is more verbose.
*/
class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
- with StreamToTableJoinTestData with LazyLogging {
+ with StreamToTableJoinTestData {
private val privateCluster: EmbeddedKafkaCluster = new
EmbeddedKafkaCluster(1)
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 89b2935..71d4834 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.common.serialization._
import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder
=> StreamsBuilderJ, _}
import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream =>
KStreamJ, KGroupedStream => KGroupedStreamJ, _}
@@ -40,7 +39,7 @@ import collection.JavaConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs
match.
*/
-class TopologyTest extends JUnitSuite with LazyLogging {
+class TopologyTest extends JUnitSuite {
val inputTopic = "input-topic"
val userClicksTopic = "user-clicks-topic"
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index f71e0cb..e827a3c 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -40,7 +40,6 @@ import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.test.TestUtils
import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
/**
* Test suite that does a classic word count example.
@@ -51,7 +50,7 @@ import com.typesafe.scalalogging.LazyLogging
* Note: In the current project settings SAM type conversion is turned off as
it's experimental in Scala 2.11.
* Hence the native Java API based version is more verbose.
*/
-class WordCountTest extends JUnitSuite with WordCountTestData with LazyLogging
{
+class WordCountTest extends JUnitSuite with WordCountTestData {
private val privateCluster: EmbeddedKafkaCluster = new
EmbeddedKafkaCluster(1)
--
To stop receiving notification emails like this one, please contact
[email protected].