This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d57fe1b MINOR: single Jackson serde for PageViewTypedDemo (#5590) d57fe1b is described below commit d57fe1b053546966e6a867d84ee24dd256bb071a Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Fri Aug 31 15:13:42 2018 -0500 MINOR: single Jackson serde for PageViewTypedDemo (#5590) Previously, we depicted creating a Jackson serde for every pojo class, which becomes a burden in practice. There are many ways to avoid this and just have a single serde, so we've decided to model this design choice instead. Reviewers: Viktor Somogyi <viktorsomo...@gmail.com>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- checkstyle/import-control.xml | 2 +- gradle/findbugs-exclude.xml | 17 ++ .../examples/pageview/JsonPOJODeserializer.java | 61 ------- .../examples/pageview/JsonPOJOSerializer.java | 55 ------- .../examples/pageview/PageViewTypedDemo.java | 175 +++++++++++++-------- 5 files changed, 128 insertions(+), 182 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 840e551..bd5c11f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -216,7 +216,7 @@ <allow pkg="org.apache.kafka.streams"/> <subpackage name="examples"> - <allow pkg="com.fasterxml.jackson.databind" /> + <allow pkg="com.fasterxml.jackson" /> <allow pkg="org.apache.kafka.connect.json" /> </subpackage> diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml index 0998185..62240d8 100644 --- a/gradle/findbugs-exclude.xml +++ b/gradle/findbugs-exclude.xml @@ -292,4 +292,21 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc </Or> <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/> </Match> + + <!-- Suppress warnings for unused members that are undetectably used by Jackson --> + <Match> + <Package name="org.apache.kafka.streams.examples.pageview"/> + <Bug pattern="NP_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/> + </Match> + <Match> + <Package name="org.apache.kafka.streams.examples.pageview"/> + <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/> + </Match> + <Match> + <Package name="org.apache.kafka.streams.examples.pageview"/> + <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/> + </Match> + + <!-- END Suppress warnings for unused members that are undetectably used by Jackson --> + </FindBugsFilter> diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java deleted file mode 100644 index d55246c..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.examples.pageview; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; - -import java.util.Map; - -public class JsonPOJODeserializer<T> implements Deserializer<T> { - private ObjectMapper objectMapper = new ObjectMapper(); - - private Class<T> tClass; - - /** - * Default constructor needed by Kafka - */ - public JsonPOJODeserializer() { - } - - @SuppressWarnings("unchecked") - @Override - public void configure(final Map<String, ?> props, final boolean isKey) { - tClass = (Class<T>) props.get("JsonPOJOClass"); - } - - @Override - public T deserialize(final String topic, final byte[] bytes) { - if (bytes == null) - return null; - - final T data; - try { - data = objectMapper.readValue(bytes, tClass); - } catch (final Exception e) { - throw new SerializationException(e); - } - - return data; - } - - @Override - public void close() { - - } -} diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java deleted file mode 100644 index 81ccf1e..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.examples.pageview; - - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -public class JsonPOJOSerializer<T> implements Serializer<T> { - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Default constructor needed by Kafka - */ - public JsonPOJOSerializer() { - } - - @Override - public void configure(final Map<String, ?> props, final boolean isKey) { - } - - @Override - public byte[] serialize(final String topic, final T data) { - if (data == null) - return null; - - try { - return objectMapper.writeValueAsBytes(data); - } catch (final Exception e) { - throw new SerializationException("Error serializing JSON message", e); - } - } - - @Override - public void close() { - } - -} diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 503dbeb..871d836 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -16,23 +16,26 @@ */ package org.apache.kafka.streams.examples.pageview; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import java.util.HashMap; +import java.io.IOException; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -48,35 +51,122 @@ import java.util.concurrent.TimeUnit; * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. * * Before running this example you must create the input topics and the output topic (e.g. via - * bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via - * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + * bin/kafka-topics --create ...), and write some data to the input topics (e.g. via + * bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic. + * + * The inputs for this example are: + * - Topic: streams-pageview-input + * Key Format: (String) USER_ID + * Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID), "timestamp": (long ms TIMESTAMP)} + * + * - Topic: streams-userprofile-input + * Key Format: (String) USER_ID + * Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)} + * + * To observe the results, read the output topic (e.g., via bin/kafka-console-consumer) + * - Topic: streams-pageviewstats-typed-output + * Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region": (String REGION)} + * Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)} + * + * Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the + * generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't need the extra "_t" field. */ +@SuppressWarnings({"WeakerAccess", "unused"}) public class PageViewTypedDemo { + /** + * A serde for any class that implements {@link JSONSerdeCompatible}. Note that the classes also need to + * be registered in the {@code @JsonSubTypes} annotation on {@link JSONSerdeCompatible}. + * + * @param <T> The concrete type of the class that gets de/serialized + */ + public static class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>, Deserializer<T>, Serde<T> { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) {} + + @SuppressWarnings("unchecked") + @Override + public T deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + + try { + return (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class); + } catch (final IOException e) { + throw new SerializationException(e); + } + } + + @Override + public byte[] serialize(final String topic, final T data) { + if (data == null) { + return null; + } + + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (final Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() {} + + @Override + public Serializer<T> serializer() { + return this; + } + + @Override + public Deserializer<T> deserializer() { + return this; + } + } + + /** + * An interface for registering types that can be de/serialized with {@link JSONSerde}. + */ + @SuppressWarnings("DefaultAnnotationParam") // being explicit for the example + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t") + @JsonSubTypes({ + @JsonSubTypes.Type(value = PageView.class, name = "pv"), + @JsonSubTypes.Type(value = UserProfile.class, name = "up"), + @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"), + @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"), + @JsonSubTypes.Type(value = RegionCount.class, name = "rc") + }) + public interface JSONSerdeCompatible { + + } + // POJO classes - static public class PageView { + static public class PageView implements JSONSerdeCompatible { public String user; public String page; public Long timestamp; } - static public class UserProfile { + static public class UserProfile implements JSONSerdeCompatible { public String region; public Long timestamp; } - static public class PageViewByRegion { + static public class PageViewByRegion implements JSONSerdeCompatible { public String user; public String page; public String region; } - static public class WindowedPageViewByRegion { + static public class WindowedPageViewByRegion implements JSONSerdeCompatible { public long windowStart; public String region; } - static public class RegionCount { + static public class RegionCount implements JSONSerdeCompatible { public long count; public String region; } @@ -86,67 +176,21 @@ public class PageViewTypedDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class); + props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, JSONSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class); + props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JSONSerde.class); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final StreamsBuilder builder = new StreamsBuilder(); - // TODO: the following can be removed with a serialization factory - final Map<String, Object> serdeProps = new HashMap<>(); - - final Serializer<PageView> pageViewSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", PageView.class); - pageViewSerializer.configure(serdeProps, false); - - final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", PageView.class); - pageViewDeserializer.configure(serdeProps, false); - - final Serde<PageView> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer); - - final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileSerializer.configure(serdeProps, false); - - final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileDeserializer.configure(serdeProps, false); - - final Serde<UserProfile> userProfileSerde = Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer); - - final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); - wPageViewByRegionSerializer.configure(serdeProps, false); - - final Deserializer<WindowedPageViewByRegion> wPageViewByRegionDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); - wPageViewByRegionDeserializer.configure(serdeProps, false); - - final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer); - - final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", RegionCount.class); - regionCountSerializer.configure(serdeProps, false); - - final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", RegionCount.class); - regionCountDeserializer.configure(serdeProps, false); - final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer); - - final Serializer<PageViewByRegion> pageViewByRegionSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", PageViewByRegion.class); - pageViewByRegionSerializer.configure(serdeProps, false); - final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", PageViewByRegion.class); - pageViewByRegionDeserializer.configure(serdeProps, false); - final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer); - - final KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde)); + final KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new JSONSerde<>())); - final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", - Consumed.with(Serdes.String(), userProfileSerde)); + final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>())); final KStream<WindowedPageViewByRegion, RegionCount> regionCount = views .leftJoin(users, (view, profile) -> { @@ -162,7 +206,7 @@ public class PageViewTypedDemo { return viewByRegion; }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) - .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde)) + .groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>())) .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1))) .count() .toStream() @@ -179,7 +223,7 @@ public class PageViewTypedDemo { }); // write to the result topic - regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde)); + regionCount.to("streams-pageviewstats-typed-output"); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); @@ -197,6 +241,7 @@ public class PageViewTypedDemo { streams.start(); latch.await(); } catch (final Throwable e) { + e.printStackTrace(); System.exit(1); } System.exit(0);