This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new dadd02de8b0 HOTFIX: remove PageView example to support Java11 for
:streams:examples module (#19052)
dadd02de8b0 is described below
commit dadd02de8b0f9aa772973e668d92a9c59a266c27
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 3 10:00:15 2025 -0800
HOTFIX: remove PageView example to support Java11 for :streams:examples
module (#19052)
The PageView example depends on Connect to pull in Json (de)serializers,
but Connect does not support Java11 any longer.
To allow supporting Java11 for the Kafka Streams examples, this PR
removes the PageView examples and Connect dependency.
Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Jacot <[email protected]>
---
build.gradle | 4 +-
docs/streams/developer-guide/datatypes.html | 8 +-
.../examples/pageview/JsonTimestampExtractor.java | 46 ----
.../examples/pageview/PageViewTypedDemo.java | 250 ---------------------
.../examples/pageview/PageViewUntypedDemo.java | 120 ----------
5 files changed, 4 insertions(+), 424 deletions(-)
diff --git a/build.gradle b/build.gradle
index f4ed4395f0d..388a85aa851 100644
--- a/build.gradle
+++ b/build.gradle
@@ -51,7 +51,7 @@ ext {
gradleVersion = versions.gradle
minClientJavaVersion = 11
minNonClientJavaVersion = 17
- modulesNeedingJava11 = [":clients", ":generator", ":streams",
":streams:test-utils", ":streams-scala", ":test-common:test-common-util"]
+ modulesNeedingJava11 = [":clients", ":examples", ":generator", ":streams",
":streams:examples", ":streams:test-utils", ":streams-scala",
":test-common:test-common-util"]
buildVersionFileName = "kafka-version.properties"
@@ -2915,8 +2915,6 @@ project(':streams:examples') {
}
dependencies {
- // this dependency should be removed after we unify data API
- implementation(project(':connect:json'))
implementation project(':streams')
implementation libs.slf4jApi
diff --git a/docs/streams/developer-guide/datatypes.html
b/docs/streams/developer-guide/datatypes.html
index afa6397c5ef..f2968591ccd 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -156,11 +156,9 @@ userCountByRegion.to("RegionCountsTopic",
Produced.valueSerde(Serdes.L
</div>
<div class="section" id="json">
<h3>JSON<a class="headerlink" href="#json" title="Permalink to this
headline"></a></h3>
- <p>The Kafka Streams code examples also include a basic serde
implementation for JSON:</p>
- <ul class="simple">
- <li><a class="reference external"
href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83">PageViewTypedDemo</a></li>
- </ul>
- <p>As shown in the example, you can use JSONSerdes inner classes <code
class="docutils literal"><span
class="pre">Serdes.serdeFrom(<serializerInstance>,
<deserializerInstance>)</span></code> to construct JSON compatible
serializers and deserializers.
+ <p>You can use <code>JsonSerializer</code> and
<code>JsonDeserializer</code> from Kafka Connect to construct JSON compatible
serializers and deserializers
+ using <code class="docutils literal"><span
class="pre">Serdes.serdeFrom(<serializerInstance>,
<deserializerInstance>)</span></code>.
+ Note, that Kafka Connect's Json (de)serializer requires Java 17.
</p>
</div>
<div class="section" id="implementing-custom-serdes">
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
deleted file mode 100644
index 6522aadf623..00000000000
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * A timestamp extractor implementation that tries to extract event time from
- * the "timestamp" field in the Json formatted message.
- */
-public class JsonTimestampExtractor implements TimestampExtractor {
-
- @Override
- public long extract(final ConsumerRecord<Object, Object> record, final
long partitionTime) {
- if (record.value() instanceof PageViewTypedDemo.PageView) {
- return ((PageViewTypedDemo.PageView) record.value()).timestamp;
- }
-
- if (record.value() instanceof PageViewTypedDemo.UserProfile) {
- return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
- }
-
- if (record.value() instanceof JsonNode) {
- return ((JsonNode) record.value()).get("timestamp").longValue();
- }
-
- throw new IllegalArgumentException("JsonTimestampExtractor cannot
recognize the record value " + record.value());
- }
-}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
deleted file mode 100644
index 6de15ef91a4..00000000000
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an
example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific
bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * <p>In this example, we join a stream of pageviews (aka clickstreams) that
reads from a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named
"streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the
number of pageviews per user region.
- *
- * <p>Before running this example you must create the input topics and the
output topic (e.g. via
- * bin/kafka-topics --create ...), and write some data to the input topics
(e.g. via
- * bin/kafka-console-producer). Otherwise you won't see any data arriving in
the output topic.
- *
- * <p>The inputs for this example are:
- * - Topic: streams-pageview-input
- * Key Format: (String) USER_ID
- * Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page":
(String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
- * <p>
- * - Topic: streams-userprofile-input
- * Key Format: (String) USER_ID
- * Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp":
(long ms TIMESTAMP)}
- *
- * <p>To observe the results, read the output topic (e.g., via
bin/kafka-console-consumer)
- * - Topic: streams-pageviewstats-typed-output
- * Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms
WINDOW_TIMESTAMP), "region": (String REGION)}
- * Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region":
(String REGION)}
- *
- * <p>Note, the "_t" field is necessary to help Jackson identify the correct
class for deserialization in the
- * generic {@link JSONSerde}. If you instead specify a specific serde per
class, you won't need the extra "_t" field.
- */
-public class PageViewTypedDemo {
-
- /**
- * A serde for any class that implements {@link JSONSerdeCompatible}. Note
that the classes also need to
- * be registered in the {@code @JsonSubTypes} annotation on {@link
JSONSerdeCompatible}.
- *
- * @param <T> The concrete type of the class that gets de/serialized
- */
- public static class JSONSerde<T extends JSONSerdeCompatible> implements
Serializer<T>, Deserializer<T>, Serde<T> {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- @Override
- public void configure(final Map<String, ?> configs, final boolean
isKey) {}
-
- @SuppressWarnings("unchecked")
- @Override
- public T deserialize(final String topic, final byte[] data) {
- if (data == null) {
- return null;
- }
-
- try {
- return (T) OBJECT_MAPPER.readValue(data,
JSONSerdeCompatible.class);
- } catch (final IOException e) {
- throw new SerializationException(e);
- }
- }
-
- @Override
- public byte[] serialize(final String topic, final T data) {
- if (data == null) {
- return null;
- }
-
- try {
- return OBJECT_MAPPER.writeValueAsBytes(data);
- } catch (final Exception e) {
- throw new SerializationException("Error serializing JSON
message", e);
- }
- }
-
- @Override
- public void close() {}
-
- @Override
- public Serializer<T> serializer() {
- return this;
- }
-
- @Override
- public Deserializer<T> deserializer() {
- return this;
- }
- }
-
- /**
- * An interface for registering types that can be de/serialized with
{@link JSONSerde}.
- */
- @SuppressWarnings("DefaultAnnotationParam") // being explicit for the
example
- @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "_t")
- @JsonSubTypes({
- @JsonSubTypes.Type(value = PageView.class, name = "pv"),
- @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
- @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
- @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name =
"wpvbr"),
- @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
- })
- public interface JSONSerdeCompatible {
-
- }
-
- // POJO classes
- public static class PageView implements JSONSerdeCompatible {
- public String user;
- public String page;
- public Long timestamp;
- }
-
- public static class UserProfile implements JSONSerdeCompatible {
- public String region;
- public Long timestamp;
- }
-
- public static class PageViewByRegion implements JSONSerdeCompatible {
- public String user;
- public String page;
- public String region;
- }
-
- public static class WindowedPageViewByRegion implements
JSONSerdeCompatible {
- public long windowStart;
- public String region;
- }
-
- public static class RegionCount implements JSONSerdeCompatible {
- public long count;
- public String region;
- }
-
- public static void main(final String[] args) {
- final Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-pageview-typed");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
JsonTimestampExtractor.class);
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
JSONSerde.class);
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
JSONSerde.class);
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
- props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-
- // setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KStream<String, PageView> views =
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new
JSONSerde<>()));
-
- final KTable<String, UserProfile> users =
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new
JSONSerde<>()));
-
- final Duration duration24Hours = Duration.ofHours(24);
-
- final KStream<WindowedPageViewByRegion, RegionCount> regionCount =
views
- .leftJoin(users, (view, profile) -> {
- final PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
-
- if (profile != null) {
- viewByRegion.region = profile.region;
- } else {
- viewByRegion.region = "UNKNOWN";
- }
- return viewByRegion;
- })
- .map((user, viewRegion) -> new KeyValue<>(viewRegion.region,
viewRegion))
- .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
- .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7),
duration24Hours).advanceBy(Duration.ofSeconds(1)))
- .count()
- .toStream()
- .map((key, value) -> {
- final WindowedPageViewByRegion wViewByRegion = new
WindowedPageViewByRegion();
- wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.key();
-
- final RegionCount rCount = new RegionCount();
- rCount.region = key.key();
- rCount.count = value;
-
- return new KeyValue<>(wViewByRegion, rCount);
- });
-
- // write to the result topic
- regionCount.to("streams-pageviewstats-typed-output", Produced.with(new
JSONSerde<>(), new JSONSerde<>()));
-
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final CountDownLatch latch = new CountDownLatch(1);
-
- // attach shutdown handler to catch control-c
- Runtime.getRuntime().addShutdownHook(new
Thread("streams-pipe-shutdown-hook") {
- @Override
- public void run() {
- streams.close();
- latch.countDown();
- }
- });
-
- try {
- streams.start();
- latch.await();
- } catch (final Throwable e) {
- e.printStackTrace();
- System.exit(1);
- }
- System.exit(0);
- }
-}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
deleted file mode 100644
index 155dc1b46b3..00000000000
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.time.Duration;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an
example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic
bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * <p>In this example, we join a stream of pageviews (aka clickstreams) that
reads from a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named
"streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the
number of pageviews per user region.
- *
- * <p>Before running this example you must create the input topics and the
output topic (e.g. via
- * bin/kafka-topics.sh --create ...), and write some data to the input topics
(e.g. via
- * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving
in the output topic.
- */
-public class PageViewUntypedDemo {
-
- public static void main(final String[] args) throws Exception {
- final Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-pageview-untyped");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
JsonTimestampExtractor.class);
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
-
- // setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
- final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
- final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
-
- final Consumed<String, JsonNode> consumed =
Consumed.with(Serdes.String(), jsonSerde);
- final KStream<String, JsonNode> views =
builder.stream("streams-pageview-input", consumed);
-
- final KTable<String, JsonNode> users =
builder.table("streams-userprofile-input", consumed);
-
- final KTable<String, String> userRegions = users.mapValues(record ->
record.get("region").textValue());
-
- final Duration duration24Hours = Duration.ofHours(24);
-
- final KStream<JsonNode, JsonNode> regionCount = views
- .leftJoin(userRegions, (view, region) -> {
- final ObjectNode jNode = JsonNodeFactory.instance.objectNode();
- return (JsonNode) jNode.put("user",
view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region == null ? "UNKNOWN" : region);
-
- })
- .map((user, viewRegion) -> new
KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7),
duration24Hours).advanceBy(Duration.ofSeconds(1)))
- .count()
- .toStream()
- .map((key, value) -> {
- final ObjectNode keyNode =
JsonNodeFactory.instance.objectNode();
- keyNode.put("window-start", key.window().start())
- .put("region", key.key());
-
- final ObjectNode valueNode =
JsonNodeFactory.instance.objectNode();
- valueNode.put("count", value);
-
- return new KeyValue<>((JsonNode) keyNode, (JsonNode)
valueNode);
- });
-
- // write to the result topic
- regionCount.to("streams-pageviewstats-untyped-output",
Produced.with(jsonSerde, jsonSerde));
-
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- streams.start();
-
- // usually the stream application would be running forever,
- // in this example we just let it run for some time and stop since the
input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}