This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d8eddc6 MINOR: Add missing imports to 'Hello Kafka Streams' examples
(#4535)
d8eddc6 is described below
commit d8eddc6e16012e43ab832d9f522d04429fc56227
Author: Daniel Wojda <[email protected]>
AuthorDate: Thu Feb 8 17:53:57 2018 +0000
MINOR: Add missing imports to 'Hello Kafka Streams' examples (#4535)
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>
---
docs/streams/index.html | 45 +++++++++++++++++++++++++--------------------
1 file changed, 25 insertions(+), 20 deletions(-)
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 904338e..fe8504d 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -153,26 +153,28 @@
<div class="code-example__snippet b-java-8 selected">
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
+ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.Topology;
+ import org.apache.kafka.streams.kstream.KStream;
+ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
-
+
import java.util.Arrays;
import java.util.Properties;
-
+
public class WordCountApplication {
-
+
public static void main(final String[] args) throws
Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
+
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines =
builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
@@ -180,11 +182,11 @@
.groupBy((key, word) -> word)
.count(Materialized.<String, Long,
KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic",
Produced.with(Serdes.String(), Serdes.Long()));
-
+
KafkaStreams streams = new
KafkaStreams(builder.build(), config);
streams.start();
}
-
+
}
</pre>
</div>
@@ -192,14 +194,16 @@
<div class="code-example__snippet b-java-7">
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
+ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.Topology;
+ import org.apache.kafka.streams.kstream.KStream;
+ import org.apache.kafka.streams.kstream.KTable;
+ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
- import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
@@ -247,16 +251,17 @@
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
-
+
import org.apache.kafka.common.serialization._
+ import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._
- import org.apache.kafka.streams.kstream.{KeyValueMapper,
Materialized, Produced, ValueMapper}
- import org.apache.kafka.streams.state.KeyValueStore;
-
+ import org.apache.kafka.streams.kstream.{KStream, KTable,
Materialized, Produced}
+ import org.apache.kafka.streams.state.KeyValueStore
+
import
scala.collection.JavaConverters.asJavaIterableConverter
-
+
object WordCountApplication {
-
+
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
@@ -266,23 +271,23 @@
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
-
+
val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] =
builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine =>
textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long,
KeyValueStore[Bytes, Array[Byte]]]])
- wordCounts.toStream().to("WordsWithCountsTopic",
Produced.with(Serdes.String(), Serdes.Long()))
-
+ wordCounts.toStream().to("WordsWithCountsTopic",
Produced.`with`(Serdes.String(), Serdes.Long()))
+
val streams: KafkaStreams = new
KafkaStreams(builder.build(), config)
streams.start()
-
+
Runtime.getRuntime.addShutdownHook(new Thread(() =>
{
streams.close(10, TimeUnit.SECONDS)
}))
}
-
+
}
</pre>
</div>
--
To stop receiving notification emails like this one, please contact
[email protected].