This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new d7fe53fd9dd MINOR: Remove Java 7 example code (#16308)
d7fe53fd9dd is described below
commit d7fe53fd9dde78ed4627b17e77838178258fe089
Author: Jim Galasyn <[email protected]>
AuthorDate: Wed Jun 12 16:50:34 2024 -0700
MINOR: Remove Java 7 example code (#16308)
Reviewers: Matthias J. Sax <[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 406 ++----------------------
docs/streams/developer-guide/write-streams.html | 22 +-
docs/streams/index.html | 54 ----
3 files changed, 30 insertions(+), 452 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index fd5c22cae33..c5c1797490a 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -383,8 +383,7 @@ Map<String, KStream<String, Long>> branches =
// KStream branches.get("Branch-A") contains all records whose keys
start with "A"
// KStream branches.get("Branch-B") contains all records whose keys
start with "B"
// KStream branches.get("Branch-C") contains all other records
-
-// Java 7 example: cf. `filter` for how to create `Predicate`
instances</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p
class="first"><strong>Filter</strong></p>
@@ -401,15 +400,7 @@ Map<String, KStream<String, Long>> branches =
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filter((key, value) ->
value > 0);
-
-// Java 7 example
-KStream<String, Long> onlyPositives = stream.filter(
- new Predicate<String, Long>() {
- @Override
- public boolean test(String key, Long value) {
- return value > 0;
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Inverse
Filter</strong></p>
@@ -427,14 +418,7 @@ KStream<String, Long> onlyPositives = stream.filter(
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filterNot((key, value)
-> value <= 0);
-// Java 7 example
-KStream<String, Long> onlyPositives = stream.filterNot(
- new Predicate<String, Long>() {
- @Override
- public boolean test(String key, Long value) {
- return value <= 0;
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p
class="first"><strong>FlatMap</strong></p>
@@ -460,8 +444,7 @@ KStream<String, Integer> transformed = stream.flatMap(
return result;
}
);
-
-// Java 7 example: cf. `map` for how to create `KeyValueMapper`
instances</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-even"><td><p
class="first"><strong>FlatMapValues</strong></p>
@@ -477,8 +460,7 @@ KStream<String, Integer> transformed = stream.flatMap(
<pre class="line-numbers"><code
class="language-java">// Split a sentence into words.
KStream<byte[], String> sentences = ...;
KStream<byte[], String> words = sentences.flatMapValues(value ->
Arrays.asList(value.split("\\s+")));
-
-// Java 7 example: cf. `mapValues` for how to create `ValueMapper`
instances</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p
class="first"><strong>Foreach</strong></p>
@@ -499,15 +481,7 @@ KStream<byte[], String> words =
sentences.flatMapValues(value -> Arrays
// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -> System.out.println(key + " => "
+ value));
-
-// Java 7 example
-stream.foreach(
- new ForeachAction<String, Long>() {
- @Override
- public void apply(String key, Long value) {
- System.out.println(key + " => " + value);
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-even"><td><p
class="first"><strong>GroupByKey</strong></p>
@@ -598,34 +572,7 @@ KGroupedTable<String, Integer> groupedTable =
table.groupBy(
Serdes.Integer()) /* value (note: type was modified) */
);
-
-// Java 7 examples
-
-// Group the stream by a new key and key type
-KGroupedStream<String, String> groupedStream = stream.groupBy(
- new KeyValueMapper<byte[], String, String>>() {
- @Override
- public String apply(byte[] key, String value) {
- return value;
- }
- },
- Grouped.with(
- Serdes.String(), /* key (note: type was modified) */
- Serdes.String()) /* value */
- );
-
-// Group the table by a new key and key type, and also modify the value and
value type.
-KGroupedTable<String, Integer> groupedTable = table.groupBy(
- new KeyValueMapper<byte[], String, KeyValue<String,
Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(byte[] key, String value) {
- return KeyValue.pair(value, value.length());
- }
- },
- Grouped.with(
- Serdes.String(), /* key (note: type was modified) */
- Serdes.Integer()) /* value (note: type was modified) */
- );</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-even"><td><p
class="first"><strong>Cogroup</strong></p>
@@ -671,15 +618,7 @@ KTable<byte[], String> table2 =
cogroupedStream.windowedBy(TimeWindows.ofS
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
-
-// Java 7 example
-KStream<String, Integer> transformed = stream.map(
- new KeyValueMapper<byte[], String, KeyValue<String,
Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(byte[] key, String value) {
- return new KeyValue<>(value.toLowerCase(), value.length());
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Map
(values only)</strong></p>
@@ -698,15 +637,7 @@ KStream<String, Integer> transformed = stream.map(
// Java 8+ example, using lambda expressions
KStream<byte[], String> uppercased = stream.mapValues(value ->
value.toUpperCase());
-
-// Java 7 example
-KStream<byte[], String> uppercased = stream.mapValues(
- new ValueMapper<String>() {
- @Override
- public String apply(String s) {
- return s.toUpperCase();
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p
class="first"><strong>Merge</strong></p>
@@ -744,15 +675,7 @@ KStream<byte[], String> merged =
stream1.merge(stream2);</code></pre>
// Java 8+ example, using lambda expressions
KStream<byte[], String> unmodifiedStream = stream.peek(
(key, value) -> System.out.println("key=" + key + ",
value=" + value));
-
-// Java 7 example
-KStream<byte[], String> unmodifiedStream = stream.peek(
- new ForeachAction<byte[], String>() {
- @Override
- public void apply(byte[] key, String value) {
- System.out.println("key=" + key + ", value=" +
value);
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p
class="first"><strong>Print</strong></p>
@@ -788,15 +711,7 @@
stream.print(Printed.toFile("streams.out").withLabel("streams&quo
// Derive a new record key from the record's value. Note how the key type
changes, too.
// Java 8+ example, using lambda expressions
KStream<String, String> rekeyed = stream.selectKey((key, value) ->
value.split(" ")[0])
-
-// Java 7 example
-KStream<String, String> rekeyed = stream.selectKey(
- new KeyValueMapper<byte[], String, String>() {
- @Override
- public String apply(byte[] key, String value) {
- return value.split(" ")[0];
- }
- });</code></pre>
+</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Table to
Stream</strong></p>
@@ -895,25 +810,7 @@ KStream<String, Long> wordCounts = textLines
.count()
// Convert the `KTable<String, Long>` into a `KStream<String,
Long>`.
.toStream();</code></pre>
- <p>WordCount example in Java 7:</p>
- <pre class="line-numbers"><code class="language-java">// Code
below is equivalent to the previous Java 8+ example above.
-KStream<String, String> textLines = ...;
-KStream<String, Long> wordCounts = textLines
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase().split("\\W+"));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>>() {
- @Override
- public String apply(String key, String word) {
- return word;
- }
- })
- .count()
- .toStream();</code></pre>
<div class="section" id="aggregating">
<span
id="streams-developer-guide-dsl-aggregating"></span><h4><a class="toc-backref"
href="#id12">Aggregating</a><a class="headerlink" href="#aggregating"
title="Permalink to this headline"></a></h4>
<p>After records are <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">grouped</span></a> by key via <code class="docutils literal"><span
class="pre">groupByKey</span></code> or
@@ -969,49 +866,7 @@ KTable<byte[], Long> aggregatedTable =
groupedTable.aggregate(
(aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /*
subtractor */
Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("aggregated-table-store") /* state store name */
.withValueSerde(Serdes.Long()) /* serde for aggregate value */
-
-
-// Java 7 examples
-
-// Aggregating a KGroupedStream (note how the value type changes from String
to Long)
-KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<byte[], String, Long>() { /* adder */
- @Override
- public Long apply(byte[] aggKey, String newValue, Long aggValue) {
- return aggValue + newValue.length();
- }
- },
- Materialized.as("aggregated-stream-store")
- .withValueSerde(Serdes.Long());
-
-// Aggregating a KGroupedTable (note how the value type changes from String to
Long)
-KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<byte[], String, Long>() { /* adder */
- @Override
- public Long apply(byte[] aggKey, String newValue, Long aggValue) {
- return aggValue + newValue.length();
- }
- },
- new Aggregator<byte[], String, Long>() { /* subtractor */
- @Override
- public Long apply(byte[] aggKey, String oldValue, Long aggValue) {
- return aggValue - oldValue.length();
- }
- },
- Materialized.as("aggregated-stream-store")
- .withValueSerde(Serdes.Long());</code></pre>
+</code></pre>
<p>Detailed behavior of <code class="docutils
literal"><span class="pre">KGroupedStream</span></code>:</p>
<ul class="simple">
<li>Input records with <code
class="docutils literal"><span class="pre">null</span></code> keys are
ignored.</li>
@@ -1084,50 +939,7 @@ KTable<Windowed<String>, Long>
sessionizedAggregatedStream = grouped
(aggKey, leftAggValue, rightAggValue) -> leftAggValue +
rightAggValue, /* session merger */
Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("sessionized-aggregated-stream-store") /* state
store name */
.withValueSerde(Serdes.Long())); /* serde for aggregate value */
-
-// Java 7 examples
-
-// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-KTable<Windowed<String>, Long> timeWindowedAggregatedStream =
groupedStream.windowedBy(Duration.ofMinutes(5))
- .aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<String, Long, Long>() { /* adder */
- @Override
- public Long apply(String aggKey, Long newValue, Long aggValue) {
- return aggValue + newValue;
- }
- },
- Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("time-windowed-aggregated-stream-store")
- .withValueSerde(Serdes.Long()));
-
-// Aggregating with session-based windowing (here: with an inactivity gap of 5
minutes)
-KTable<Windowed<String>, Long> sessionizedAggregatedStream =
groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)).
- aggregate(
- new Initializer<Long>() { /* initializer */
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<String, Long, Long>() { /* adder */
- @Override
- public Long apply(String aggKey, Long newValue, Long aggValue) {
- return aggValue + newValue;
- }
- },
- new Merger<String, Long>() { /* session merger */
- @Override
- public Long apply(String aggKey, Long leftAggValue, Long
rightAggValue) {
- return rightAggValue + leftAggValue;
- }
- },
- Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("sessionized-aggregated-stream-store")
- .withValueSerde(Serdes.Long()));</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul class="simple">
<li>The windowed aggregate behaves similar
to the rolling aggregate described above. The additional twist is that
@@ -1233,33 +1045,7 @@ KTable<String, Long> aggregatedStream =
groupedStream.reduce(
KTable<String, Long> aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -> aggValue + newValue, /* adder */
(aggValue, oldValue) -> aggValue - oldValue /* subtractor */);
-
-
-// Java 7 examples
-
-// Reducing a KGroupedStream
-KTable<String, Long> aggregatedStream = groupedStream.reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- });
-
-// Reducing a KGroupedTable
-KTable<String, Long> aggregatedTable = groupedTable.reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- },
- new Reducer<Long>() { /* subtractor */
- @Override
- public Long apply(Long aggValue, Long oldValue) {
- return aggValue - oldValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior for <code class="docutils
literal"><span class="pre">KGroupedStream</span></code>:</p>
<ul class="simple">
<li>Input records with <code
class="docutils literal"><span class="pre">null</span></code> keys are ignored
in general.</li>
@@ -1328,31 +1114,7 @@ KTable<Windowed<String>, Long>
sessionzedAggregatedStream = groupedS
.reduce(
(aggValue, newValue) -> aggValue + newValue /* adder */
);
-
-
-// Java 7 examples
-
-// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
-KTable<Windowed<String>, Long> timeWindowedAggregatedStream =
groupedStream.windowedBy(
- TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) /* time-based window */)
- .reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- });
-
-// Aggregating with session-based windowing (here: with an inactivity gap of 5
minutes)
-KTable<Windowed<String>, Long> timeWindowedAggregatedStream =
groupedStream.windowedBy(
- SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session
window */
- .reduce(
- new Reducer<Long>() { /* adder */
- @Override
- public Long apply(Long aggValue, Long newValue) {
- return aggValue + newValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul class="simple">
<li>The windowed reduce behaves similar to
the rolling reduce described above. The additional twist is that the
@@ -1812,21 +1574,7 @@ KStream<String, String> joined = left.join(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
-
-// Java 7 example
-KStream<String, String> joined = left.join(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- },
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
- Joined.with(
- Serdes.String(), /* key */
- Serdes.Long(), /* left value */
- Serdes.Double()) /* right value */
- );</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>, and <em>window-based</em>,
i.e. two input records are joined if and only if their
@@ -1869,21 +1617,7 @@ KStream<String, String> joined =
left.leftJoin(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
-
-// Java 7 example
-KStream<String, String> joined = left.leftJoin(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- },
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
- Joined.with(
- Serdes.String(), /* key */
- Serdes.Long(), /* left value */
- Serdes.Double()) /* right value */
- );</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>, and <em>window-based</em>,
i.e. two input records are joined if and only if their
@@ -1930,21 +1664,7 @@ KStream<String, String> joined =
left.outerJoin(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
-
-// Java 7 example
-KStream<String, String> joined = left.outerJoin(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- },
- JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
- Joined.with(
- Serdes.String(), /* key */
- Serdes.Long(), /* left value */
- Serdes.Double()) /* right value */
- );</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>, and <em>window-based</em>,
i.e. two input records are joined if and only if their
@@ -2197,15 +1917,7 @@ KTable<String, Double> right = ...;
KTable<String, String> joined = left.join(right,
(leftValue, rightValue) -> "left=" + leftValue + ",
right=" + rightValue /* ValueJoiner */
);
-
-// Java 7 example
-KTable<String, String> joined = left.join(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>.</p>
@@ -2244,15 +1956,7 @@ KTable<String, Double> right = ...;
KTable<String, String> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ",
right=" + rightValue /* ValueJoiner */
);
-
-// Java 7 example
-KTable<String, String> joined = left.leftJoin(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>.</p>
@@ -2294,15 +1998,7 @@ KTable<String, Double> right = ...;
KTable<String, String> joined = left.outerJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ",
right=" + rightValue /* ValueJoiner */
);
-
-// Java 7 example
-KTable<String, String> joined = left.outerJoin(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>.</p>
@@ -2816,19 +2512,7 @@ KStream<String, String> joined = left.join(right,
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);
-
-// Java 7 example
-KStream<String, String> joined = left.join(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- },
- Joined.keySerde(Serdes.String()) /* key */
- .withValueSerde(Serdes.Long()) /* left value */
- .withGracePeriod(Duration.ZERO) /* grace period */
- );</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>.</p>
@@ -2878,19 +2562,7 @@ KStream<String, String> joined =
left.leftJoin(right,
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);
-
-// Java 7 example
-KStream<String, String> joined = left.leftJoin(right,
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- },
- Joined.keySerde(Serdes.String()) /* key */
- .withValueSerde(Serdes.Long()) /* left value */
- .withGracePeriod(Duration.ZERO) /* grace period */
- );</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is
<em>key-based</em>, i.e. with the join predicate <code class="docutils
literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span>
<span class="pre">rightRecord.key</span></code>.</p>
@@ -3101,21 +2773,7 @@ KStream<String, String> joined = left.join(right,
(leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new
key by which to lookup against the table */
(leftValue, rightValue) -> "left=" + leftValue + ",
right=" + rightValue /* ValueJoiner */
);
-
-// Java 7 example
-KStream<String, String> joined = left.join(right,
- new KeyValueMapper<String, Long, Integer>() { /* derive a
(potentially) new key by which to lookup against the table */
- @Override
- public Integer apply(String key, Long value) {
- return key.length();
- }
- },
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul class="last">
<li><p class="first">The join is
indirectly <em>key-based</em>, i.e. with the join predicate <code
class="docutils literal"><span
class="pre">KeyValueMapper#apply(leftRecord.key,</span> <span
class="pre">leftRecord.value)</span> <span class="pre">==</span> <span
class="pre">rightRecord.key</span></code>.</p>
@@ -3153,21 +2811,7 @@ KStream<String, String> joined =
left.leftJoin(right,
(leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new
key by which to lookup against the table */
(leftValue, rightValue) -> "left=" + leftValue + ",
right=" + rightValue /* ValueJoiner */
);
-
-// Java 7 example
-KStream<String, String> joined = left.leftJoin(right,
- new KeyValueMapper<String, Long, Integer>() { /* derive a
(potentially) new key by which to lookup against the table */
- @Override
- public Integer apply(String key, Long value) {
- return key.length();
- }
- },
- new ValueJoiner<Long, Double, String>() {
- @Override
- public String apply(Long leftValue, Double rightValue) {
- return "left=" + leftValue + ", right=" +
rightValue;
- }
- });</code></pre>
+</code></pre>
<p>Detailed behavior:</p>
<ul class="last">
<li><p class="first">The join is
indirectly <em>key-based</em>, i.e. with the join predicate <code
class="docutils literal"><span
class="pre">KeyValueMapper#apply(leftRecord.key,</span> <span
class="pre">leftRecord.value)</span> <span class="pre">==</span> <span
class="pre">rightRecord.key</span></code>.</p>
diff --git a/docs/streams/developer-guide/write-streams.html
b/docs/streams/developer-guide/write-streams.html
index aae71388a78..21d73f1054e 100644
--- a/docs/streams/developer-guide/write-streams.html
+++ b/docs/streams/developer-guide/write-streams.html
@@ -152,31 +152,19 @@ streams.start();</code></pre>
<p>To catch any unexpected exceptions, you can set an <code
class="docutils literal"><span
class="pre">java.lang.Thread.UncaughtExceptionHandler</span></code> before you
start the
application. This handler is called whenever a stream thread is
terminated by an unexpected exception:</p>
<pre class="line-numbers"><code class="language-java">// Java 8+,
using lambda expressions
-streams.setUncaughtExceptionHander((exception) ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);</code></pre>
- <p>The <code class="docutils literal"><span
class="pre">StreamsUncaughtExceptionHandler</span></code> interface enables
responding to exceptions not handled by Kafka Streams. It has one method, <code
class="docutils literal"><span class="pre">handle</span></code>, that returns
an enum of type <code class="docutils literal"><span
class="pre">StreamThreadExceptionResponse</span></code>. You have the
opportunity to define how Streams responds to the exception, with three
possible valu [...]
- <p>The <code class="docutils literal"><span
class="pre">SHUTDOWN_APPLICATION</span></code> option is best-effort only and
doesn't guarantee that all application instances will be stopped.
+streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) ->
{
+ // here you should examine the throwable/exception and perform an
appropriate action!
+});
+</code></pre>
<p>To stop the application instance, call the <code class="docutils
literal"><span class="pre">KafkaStreams#close()</span></code> method:</p>
<pre class="line-numbers"><code class="language-java">// Stop the
Kafka Streams threads
streams.close();</code></pre>
<p>To allow your application to gracefully shutdown in response to
SIGTERM, it is recommended that you add a shutdown hook
and call <code class="docutils literal"><span
class="pre">KafkaStreams#close</span></code>.</p>
- <ul>
- <li><p class="first">Here is a shutdown hook example in Java 8+:</p>
+ <p class="first">Here is a shutdown hook example in Java 8+:</p>
<pre class="line-numbers"><code class="language-java">// Add
shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));</code></pre>
- </li>
- <li><p class="first">Here is a shutdown hook example in Java 7:</p>
- <pre class="line-numbers"><code class="language-java">// Add
shutdown hook to stop the Kafka Streams threads.
-// You can optionally provide a timeout to `close`.
-Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- streams.close();
- }
-}));</code></pre>
- </li>
- </ul>
<p>After an application is stopped, Kafka Streams will migrate any tasks
that had been running in this instance to available remaining
instances.</p>
</div>
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 7fd2d7e6ade..307a9648b20 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -193,7 +193,6 @@
<div class="code-example">
<div class="btn-group">
<a class="selected b-java-8" data-section="java-8">Java 8+</a>
- <a class="b-java-7" data-section="java-7">Java 7</a>
<a class="b-scala" data-section="scala">Scala</a>
</div>
@@ -233,59 +232,6 @@ public class WordCountApplication {
streams.start();
}
-}</code></pre>
- </div>
-
- <div class="code-example__snippet b-java-7">
- <pre class="line-numbers"><code class="language-java">import
org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-public class WordCountApplication {
-
- public static void main(final String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
- StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> textLines =
builder.stream("TextLinesTopic");
- KTable<String, Long> wordCounts = textLines
- .flatMapValues(new ValueMapper<String,
Iterable<String>>() {
- @Override
- public Iterable<String> apply(String textLine) {
- return Arrays.asList(textLine.toLowerCase().split("\\W+"));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String word) {
- return word;
- }
- })
- .count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("counts-store"));
-
-
- wordCounts.toStream().to("WordsWithCountsTopic",
Produced.with(Serdes.String(), Serdes.Long()));
-
- KafkaStreams streams = new KafkaStreams(builder.build(), props);
- streams.start();
- }
-
}</code></pre>
</div>