Repository: kafka Updated Branches: refs/heads/0.11.0 5363cd4c6 -> 60d2f1294
http://git-wip-us.apache.org/repos/asf/kafka/blob/60d2f129/docs/streams.html ---------------------------------------------------------------------- diff --git a/docs/streams.html b/docs/streams.html index bff9bf0..8192bf7 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -306,7 +306,7 @@ The following example <code>Processor</code> implementation defines a simple word-count algorithm: </p> -<pre> +<pre class="brush: java;"> public class MyProcessor implements Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Long> kvStore; @@ -380,7 +380,7 @@ public class MyProcessor implements Processor<String, String> { by connecting these processors together: </p> -<pre> +<pre class="brush: java;"> TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") @@ -424,7 +424,7 @@ builder.addSource("SOURCE", "src-topic") In the following example, a persistent key-value store named âCountsâ with key type <code>String</code> and value type <code>Long</code> is created. </p> -<pre> +<pre class="brush: java;"> StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) @@ -438,7 +438,7 @@ StateStoreSupplier countStore = Stores.create("Counts") state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>. </p> -<pre> +<pre class="brush: java;"> TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") @@ -526,7 +526,7 @@ builder.addSource("SOURCE", "src-topic") from a single topic). </p> -<pre> +<pre class="brush: java;"> KStreamBuilder builder = new KStreamBuilder(); KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2"); @@ -606,7 +606,7 @@ GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4" </p> -<pre> +<pre class="brush: java;"> // written in Java 8+, using lambda expressions KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category")); </pre> @@ -620,7 +620,7 @@ KStream<String, GenericRecord> mapped = source1.mapValue(record -> record. based on them. </p> -<pre> +<pre class="brush: java;"> // written in Java 8+, using lambda expressions KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate( () -> 0L, // initial value @@ -641,7 +641,7 @@ KStream<String, String> joined = source1.leftJoin(source2, <code>KStream.to</code> and <code>KTable.to</code>. </p> -<pre> +<pre class="brush: java;"> joined.to("topic4"); </pre> @@ -649,7 +649,7 @@ joined.to("topic4"); to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic; Kafka Streams provides a convenience method called <code>through</code>: -<pre> +<pre class="brush: java;"> // equivalent to // // joined.to("topic4"); @@ -677,7 +677,7 @@ KStream<String, String> materialized = joined.through("topic4"); set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance. </p> -<pre> +<pre class="brush: java;"> import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; @@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings); If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>: </p> -<pre> +<pre class="brush: java;"> Properties settings = new Properties(); // Example of a "normal" setting for Kafka Streams settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092"); @@ -750,7 +750,7 @@ settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above. </p> -<pre> +<pre class="brush: java;"> import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config); At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method: </p> -<pre> +<pre class="brush: java;"> // Start the Kafka Streams instance streams.start(); </pre> @@ -787,7 +787,7 @@ streams.start(); To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception: </p> -<pre> +<pre class="brush: java;"> streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public uncaughtException(Thread t, throwable e) { // here you should examine the exception and perform an appropriate action! @@ -799,7 +799,7 @@ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { To stop the application instance call the <code>close()</code> method: </p> -<pre> +<pre class="brush: java;"> // Stop the Kafka Streams instance streams.close(); </pre> @@ -807,7 +807,7 @@ streams.close(); Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application â there is no special magic or requirement on the side of Kafka Streams. For example, you can package your Java application as a fat jar file and then start the application via: -<pre> +<pre class="brush: bash;"> # Start the application in class `com.example.MyStreamsApp` # from the fat jar named `path-to-app-fatjar.jar`. $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
