This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new dadd02de8b0 HOTFIX: remove PageView example to support Java11 for 
:streams:examples module (#19052)
dadd02de8b0 is described below

commit dadd02de8b0f9aa772973e668d92a9c59a266c27
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 3 10:00:15 2025 -0800

    HOTFIX: remove PageView example to support Java11 for :streams:examples 
module (#19052)
    
    The PageView example depends on Connect to pull in Json (de)serializers,
    but Connect does not support Java11 any longer.
    
    To allow supporting Java11 for the Kafka Streams examples, this PR
    removes the PageView examples and Connect dependency.
    
    Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai 
<[email protected]>, David Jacot <[email protected]>
---
 build.gradle                                       |   4 +-
 docs/streams/developer-guide/datatypes.html        |   8 +-
 .../examples/pageview/JsonTimestampExtractor.java  |  46 ----
 .../examples/pageview/PageViewTypedDemo.java       | 250 ---------------------
 .../examples/pageview/PageViewUntypedDemo.java     | 120 ----------
 5 files changed, 4 insertions(+), 424 deletions(-)

diff --git a/build.gradle b/build.gradle
index f4ed4395f0d..388a85aa851 100644
--- a/build.gradle
+++ b/build.gradle
@@ -51,7 +51,7 @@ ext {
   gradleVersion = versions.gradle
   minClientJavaVersion = 11
   minNonClientJavaVersion = 17
-  modulesNeedingJava11 = [":clients", ":generator", ":streams", 
":streams:test-utils", ":streams-scala", ":test-common:test-common-util"]
+  modulesNeedingJava11 = [":clients", ":examples", ":generator", ":streams", 
":streams:examples", ":streams:test-utils", ":streams-scala", 
":test-common:test-common-util"]
 
   buildVersionFileName = "kafka-version.properties"
 
@@ -2915,8 +2915,6 @@ project(':streams:examples') {
   }
 
   dependencies {
-    // this dependency should be removed after we unify data API
-    implementation(project(':connect:json'))
     implementation project(':streams')
     implementation libs.slf4jApi
 
diff --git a/docs/streams/developer-guide/datatypes.html 
b/docs/streams/developer-guide/datatypes.html
index afa6397c5ef..f2968591ccd 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -156,11 +156,9 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, 
Produced.valueSerde(Serdes.L
       </div>
       <div class="section" id="json">
         <h3>JSON<a class="headerlink" href="#json" title="Permalink to this 
headline"></a></h3>
-        <p>The Kafka Streams code examples also include a basic serde 
implementation for JSON:</p>
-        <ul class="simple">
-          <li><a class="reference external" 
href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83";>PageViewTypedDemo</a></li>
-        </ul>
-        <p>As shown in the example, you can use JSONSerdes inner classes <code 
class="docutils literal"><span 
class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;, 
&lt;deserializerInstance&gt;)</span></code> to construct JSON compatible 
serializers and deserializers.
+        <p>You can use <code>JsonSerializer</code> and 
<code>JsonDeserializer</code> from Kafka Connect to construct JSON compatible 
serializers and deserializers
+            using <code class="docutils literal"><span 
class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;, 
&lt;deserializerInstance&gt;)</span></code>.
+            Note, that Kafka Connect's Json (de)serializer requires Java 17.
         </p>
       </div>
       <div class="section" id="implementing-custom-serdes">
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
deleted file mode 100644
index 6522aadf623..00000000000
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * A timestamp extractor implementation that tries to extract event time from
- * the "timestamp" field in the Json formatted message.
- */
-public class JsonTimestampExtractor implements TimestampExtractor {
-
-    @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
-        if (record.value() instanceof PageViewTypedDemo.PageView) {
-            return ((PageViewTypedDemo.PageView) record.value()).timestamp;
-        }
-
-        if (record.value() instanceof PageViewTypedDemo.UserProfile) {
-            return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
-        }
-
-        if (record.value() instanceof JsonNode) {
-            return ((JsonNode) record.value()).get("timestamp").longValue();
-        }
-
-        throw new IllegalArgumentException("JsonTimestampExtractor cannot 
recognize the record value " + record.value());
-    }
-}
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
deleted file mode 100644
index 6de15ef91a4..00000000000
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an 
example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific 
bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * <p>In this example, we join a stream of pageviews (aka clickstreams) that 
reads from a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named 
"streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the 
number of pageviews per user region.
- *
- * <p>Before running this example you must create the input topics and the 
output topic (e.g. via
- * bin/kafka-topics --create ...), and write some data to the input topics 
(e.g. via
- * bin/kafka-console-producer). Otherwise you won't see any data arriving in 
the output topic.
- *
- * <p>The inputs for this example are:
- * - Topic: streams-pageview-input
- *   Key Format: (String) USER_ID
- *   Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": 
(String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
- * <p>
- * - Topic: streams-userprofile-input
- *   Key Format: (String) USER_ID
- *   Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": 
(long ms TIMESTAMP)}
- *
- * <p>To observe the results, read the output topic (e.g., via 
bin/kafka-console-consumer)
- * - Topic: streams-pageviewstats-typed-output
- *   Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms 
WINDOW_TIMESTAMP), "region": (String REGION)}
- *   Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": 
(String REGION)}
- *
- * <p>Note, the "_t" field is necessary to help Jackson identify the correct 
class for deserialization in the
- * generic {@link JSONSerde}. If you instead specify a specific serde per 
class, you won't need the extra "_t" field.
- */
-public class PageViewTypedDemo {
-
-    /**
-     * A serde for any class that implements {@link JSONSerdeCompatible}. Note 
that the classes also need to
-     * be registered in the {@code @JsonSubTypes} annotation on {@link 
JSONSerdeCompatible}.
-     *
-     * @param <T> The concrete type of the class that gets de/serialized
-     */
-    public static class JSONSerde<T extends JSONSerdeCompatible> implements 
Serializer<T>, Deserializer<T>, Serde<T> {
-        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-        @Override
-        public void configure(final Map<String, ?> configs, final boolean 
isKey) {}
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public T deserialize(final String topic, final byte[] data) {
-            if (data == null) {
-                return null;
-            }
-
-            try {
-                return (T) OBJECT_MAPPER.readValue(data, 
JSONSerdeCompatible.class);
-            } catch (final IOException e) {
-                throw new SerializationException(e);
-            }
-        }
-
-        @Override
-        public byte[] serialize(final String topic, final T data) {
-            if (data == null) {
-                return null;
-            }
-
-            try {
-                return OBJECT_MAPPER.writeValueAsBytes(data);
-            } catch (final Exception e) {
-                throw new SerializationException("Error serializing JSON 
message", e);
-            }
-        }
-
-        @Override
-        public void close() {}
-
-        @Override
-        public Serializer<T> serializer() {
-            return this;
-        }
-
-        @Override
-        public Deserializer<T> deserializer() {
-            return this;
-        }
-    }
-
-    /**
-     * An interface for registering types that can be de/serialized with 
{@link JSONSerde}.
-     */
-    @SuppressWarnings("DefaultAnnotationParam") // being explicit for the 
example
-    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "_t")
-    @JsonSubTypes({
-        @JsonSubTypes.Type(value = PageView.class, name = "pv"),
-        @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
-        @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
-        @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = 
"wpvbr"),
-        @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
-    })
-    public interface JSONSerdeCompatible {
-
-    }
-
-    // POJO classes
-    public static class PageView implements JSONSerdeCompatible {
-        public String user;
-        public String page;
-        public Long timestamp;
-    }
-
-    public static class UserProfile implements JSONSerdeCompatible {
-        public String region;
-        public Long timestamp;
-    }
-
-    public static class PageViewByRegion implements JSONSerdeCompatible {
-        public String user;
-        public String page;
-        public String region;
-    }
-
-    public static class WindowedPageViewByRegion implements 
JSONSerdeCompatible {
-        public long windowStart;
-        public String region;
-    }
-
-    public static class RegionCount implements JSONSerdeCompatible {
-        public long count;
-        public String region;
-    }
-
-    public static void main(final String[] args) {
-        final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-pageview-typed");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
JsonTimestampExtractor.class);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
JSONSerde.class);
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
JSONSerde.class);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-
-        // setting offset reset to earliest so that we can re-run the demo 
code with the same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<String, PageView> views = 
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new 
JSONSerde<>()));
-
-        final KTable<String, UserProfile> users = 
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new 
JSONSerde<>()));
-
-        final Duration duration24Hours = Duration.ofHours(24);
-
-        final KStream<WindowedPageViewByRegion, RegionCount> regionCount = 
views
-            .leftJoin(users, (view, profile) -> {
-                final PageViewByRegion viewByRegion = new PageViewByRegion();
-                viewByRegion.user = view.user;
-                viewByRegion.page = view.page;
-
-                if (profile != null) {
-                    viewByRegion.region = profile.region;
-                } else {
-                    viewByRegion.region = "UNKNOWN";
-                }
-                return viewByRegion;
-            })
-            .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, 
viewRegion))
-            .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
-            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), 
duration24Hours).advanceBy(Duration.ofSeconds(1)))
-            .count()
-            .toStream()
-            .map((key, value) -> {
-                final WindowedPageViewByRegion wViewByRegion = new 
WindowedPageViewByRegion();
-                wViewByRegion.windowStart = key.window().start();
-                wViewByRegion.region = key.key();
-
-                final RegionCount rCount = new RegionCount();
-                rCount.region = key.key();
-                rCount.count = value;
-
-                return new KeyValue<>(wViewByRegion, rCount);
-            });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", Produced.with(new 
JSONSerde<>(), new JSONSerde<>()));
-
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        // attach shutdown handler to catch control-c
-        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-pipe-shutdown-hook") {
-            @Override
-            public void run() {
-                streams.close();
-                latch.countDown();
-            }
-        });
-
-        try {
-            streams.start();
-            latch.await();
-        } catch (final Throwable e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-        System.exit(0);
-    }
-}
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
deleted file mode 100644
index 155dc1b46b3..00000000000
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.time.Duration;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an 
example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic 
bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * <p>In this example, we join a stream of pageviews (aka clickstreams) that 
reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named 
"streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the 
number of pageviews per user region.
- *
- * <p>Before running this example you must create the input topics and the 
output topic (e.g. via
- * bin/kafka-topics.sh --create ...), and write some data to the input topics 
(e.g. via
- * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving 
in the output topic.
- */
-public class PageViewUntypedDemo {
-
-    public static void main(final String[] args) throws Exception {
-        final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-pageview-untyped");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
JsonTimestampExtractor.class);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
-
-        // setting offset reset to earliest so that we can re-run the demo 
code with the same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
-        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
-        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
jsonDeserializer);
-
-        final Consumed<String, JsonNode> consumed = 
Consumed.with(Serdes.String(), jsonSerde);
-        final KStream<String, JsonNode> views = 
builder.stream("streams-pageview-input", consumed);
-
-        final KTable<String, JsonNode> users = 
builder.table("streams-userprofile-input", consumed);
-
-        final KTable<String, String> userRegions = users.mapValues(record -> 
record.get("region").textValue());
-
-        final Duration duration24Hours = Duration.ofHours(24);
-
-        final KStream<JsonNode, JsonNode> regionCount = views
-            .leftJoin(userRegions, (view, region) -> {
-                final ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-                return (JsonNode) jNode.put("user", 
view.get("user").textValue())
-                        .put("page", view.get("page").textValue())
-                        .put("region", region == null ? "UNKNOWN" : region);
-
-            })
-            .map((user, viewRegion) -> new 
KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
-            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
-            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), 
duration24Hours).advanceBy(Duration.ofSeconds(1)))
-            .count()
-            .toStream()
-            .map((key, value) -> {
-                final ObjectNode keyNode = 
JsonNodeFactory.instance.objectNode();
-                keyNode.put("window-start", key.window().start())
-                        .put("region", key.key());
-
-                final ObjectNode valueNode = 
JsonNodeFactory.instance.objectNode();
-                valueNode.put("count", value);
-
-                return new KeyValue<>((JsonNode) keyNode, (JsonNode) 
valueNode);
-            });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", 
Produced.with(jsonSerde, jsonSerde));
-
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.start();
-
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the 
input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

Reply via email to