This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d57fe1b  MINOR: single Jackson serde for PageViewTypedDemo (#5590)
d57fe1b is described below

commit d57fe1b053546966e6a867d84ee24dd256bb071a
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Fri Aug 31 15:13:42 2018 -0500

    MINOR: single Jackson serde for PageViewTypedDemo (#5590)
    
    Previously, we depicted creating a Jackson serde for every pojo class, 
which becomes a burden in practice. There are many ways to avoid this and just 
have a single serde, so we've decided to model this design choice instead.
    
    Reviewers: Viktor Somogyi <viktorsomo...@gmail.com>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 checkstyle/import-control.xml                      |   2 +-
 gradle/findbugs-exclude.xml                        |  17 ++
 .../examples/pageview/JsonPOJODeserializer.java    |  61 -------
 .../examples/pageview/JsonPOJOSerializer.java      |  55 -------
 .../examples/pageview/PageViewTypedDemo.java       | 175 +++++++++++++--------
 5 files changed, 128 insertions(+), 182 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 840e551..bd5c11f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -216,7 +216,7 @@
     <allow pkg="org.apache.kafka.streams"/>
 
     <subpackage name="examples">
-      <allow pkg="com.fasterxml.jackson.databind" />
+      <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.connect.json" />
     </subpackage>
 
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 0998185..62240d8 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -292,4 +292,21 @@ For a detailed description of findbugs bug categories, see 
http://findbugs.sourc
         </Or>
         <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
     </Match>
+
+    <!-- Suppress warnings for unused members that are undetectably used by 
Jackson -->
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="NP_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+
+    <!-- END Suppress warnings for unused members that are undetectably used 
by Jackson -->
+
 </FindBugsFilter>
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
deleted file mode 100644
index d55246c..0000000
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
-
-public class JsonPOJODeserializer<T> implements Deserializer<T> {
-    private ObjectMapper objectMapper = new ObjectMapper();
-
-    private Class<T> tClass;
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJODeserializer() {
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(final Map<String, ?> props, final boolean isKey) {
-        tClass = (Class<T>) props.get("JsonPOJOClass");
-    }
-
-    @Override
-    public T deserialize(final String topic, final byte[] bytes) {
-        if (bytes == null)
-            return null;
-
-        final T data;
-        try {
-            data = objectMapper.readValue(bytes, tClass);
-        } catch (final Exception e) {
-            throw new SerializationException(e);
-        }
-
-        return data;
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
deleted file mode 100644
index 81ccf1e..0000000
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
-
-public class JsonPOJOSerializer<T> implements Serializer<T> {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJOSerializer() {
-    }
-    
-    @Override
-    public void configure(final Map<String, ?> props, final boolean isKey) {
-    }
-
-    @Override
-    public byte[] serialize(final String topic, final T data) {
-        if (data == null)
-            return null;
-
-        try {
-            return objectMapper.writeValueAsBytes(data);
-        } catch (final Exception e) {
-            throw new SerializationException("Error serializing JSON message", 
e);
-        }
-    }
-
-    @Override
-    public void close() {
-    }
-
-}
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 503dbeb..871d836 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -16,23 +16,26 @@
  */
 package org.apache.kafka.streams.examples.pageview;
 
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -48,35 +51,122 @@ import java.util.concurrent.TimeUnit;
  * is JSON string representing a record in the stream or table, to compute the 
number of pageviews per user region.
  *
  * Before running this example you must create the input topics and the output 
topic (e.g. via
- * bin/kafka-topics.sh --create ...), and write some data to the input topics 
(e.g. via
- * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving 
in the output topic.
+ * bin/kafka-topics --create ...), and write some data to the input topics 
(e.g. via
+ * bin/kafka-console-producer). Otherwise you won't see any data arriving in 
the output topic.
+ *
+ * The inputs for this example are:
+ * - Topic: streams-pageview-input
+ *   Key Format: (String) USER_ID
+ *   Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": 
(String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
+ *
+ * - Topic: streams-userprofile-input
+ *   Key Format: (String) USER_ID
+ *   Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": 
(long ms TIMESTAMP)}
+ *
+ * To observe the results, read the output topic (e.g., via 
bin/kafka-console-consumer)
+ * - Topic: streams-pageviewstats-typed-output
+ *   Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms 
WINDOW_TIMESTAMP), "region": (String REGION)}
+ *   Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": 
(String REGION)}
+ *
+ * Note, the "_t" field is necessary to help Jackson identify the correct 
class for deserialization in the
+ * generic {@link JSONSerde}. If you instead specify a specific serde per 
class, you won't need the extra "_t" field.
  */
+@SuppressWarnings({"WeakerAccess", "unused"})
 public class PageViewTypedDemo {
 
+    /**
+     * A serde for any class that implements {@link JSONSerdeCompatible}. Note 
that the classes also need to
+     * be registered in the {@code @JsonSubTypes} annotation on {@link 
JSONSerdeCompatible}.
+     *
+     * @param <T> The concrete type of the class that gets de/serialized
+     */
+    public static class JSONSerde<T extends JSONSerdeCompatible> implements 
Serializer<T>, Deserializer<T>, Serde<T> {
+        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) {}
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T deserialize(final String topic, final byte[] data) {
+            if (data == null) {
+                return null;
+            }
+
+            try {
+                return (T) OBJECT_MAPPER.readValue(data, 
JSONSerdeCompatible.class);
+            } catch (final IOException e) {
+                throw new SerializationException(e);
+            }
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final T data) {
+            if (data == null) {
+                return null;
+            }
+
+            try {
+                return OBJECT_MAPPER.writeValueAsBytes(data);
+            } catch (final Exception e) {
+                throw new SerializationException("Error serializing JSON 
message", e);
+            }
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public Serializer<T> serializer() {
+            return this;
+        }
+
+        @Override
+        public Deserializer<T> deserializer() {
+            return this;
+        }
+    }
+
+    /**
+     * An interface for registering types that can be de/serialized with 
{@link JSONSerde}.
+     */
+    @SuppressWarnings("DefaultAnnotationParam") // being explicit for the 
example
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "_t")
+    @JsonSubTypes({
+                      @JsonSubTypes.Type(value = PageView.class, name = "pv"),
+                      @JsonSubTypes.Type(value = UserProfile.class, name = 
"up"),
+                      @JsonSubTypes.Type(value = PageViewByRegion.class, name 
= "pvbr"),
+                      @JsonSubTypes.Type(value = 
WindowedPageViewByRegion.class, name = "wpvbr"),
+                      @JsonSubTypes.Type(value = RegionCount.class, name = 
"rc")
+                  })
+    public interface JSONSerdeCompatible {
+
+    }
+
     // POJO classes
-    static public class PageView {
+    static public class PageView implements JSONSerdeCompatible {
         public String user;
         public String page;
         public Long timestamp;
     }
 
-    static public class UserProfile {
+    static public class UserProfile implements JSONSerdeCompatible {
         public String region;
         public Long timestamp;
     }
 
-    static public class PageViewByRegion {
+    static public class PageViewByRegion implements JSONSerdeCompatible {
         public String user;
         public String page;
         public String region;
     }
 
-    static public class WindowedPageViewByRegion {
+    static public class WindowedPageViewByRegion implements 
JSONSerdeCompatible {
         public long windowStart;
         public String region;
     }
 
-    static public class RegionCount {
+    static public class RegionCount implements JSONSerdeCompatible {
         public long count;
         public String region;
     }
@@ -86,67 +176,21 @@ public class PageViewTypedDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
JsonTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, 
JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, 
JSONSerde.class);
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 
         // setting offset reset to earliest so that we can re-run the demo 
code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        // TODO: the following can be removed with a serialization factory
-        final Map<String, Object> serdeProps = new HashMap<>();
-
-        final Serializer<PageView> pageViewSerializer = new 
JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewSerializer.configure(serdeProps, false);
-
-        final Deserializer<PageView> pageViewDeserializer = new 
JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewDeserializer.configure(serdeProps, false);
-
-        final Serde<PageView> pageViewSerde = 
Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);
-
-        final Serializer<UserProfile> userProfileSerializer = new 
JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileSerializer.configure(serdeProps, false);
-
-        final Deserializer<UserProfile> userProfileDeserializer = new 
JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
-        final Serde<UserProfile> userProfileSerde = 
Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer);
-
-        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer 
= new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionSerializer.configure(serdeProps, false);
-
-        final Deserializer<WindowedPageViewByRegion> 
wPageViewByRegionDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionDeserializer.configure(serdeProps, false);
-
-        final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = 
Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer);
-
-        final Serializer<RegionCount> regionCountSerializer = new 
JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountSerializer.configure(serdeProps, false);
-
-        final Deserializer<RegionCount> regionCountDeserializer = new 
JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountDeserializer.configure(serdeProps, false);
-        final Serde<RegionCount> regionCountSerde = 
Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
-
-        final Serializer<PageViewByRegion> pageViewByRegionSerializer = new 
JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
-        pageViewByRegionSerializer.configure(serdeProps, false);
-        final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = 
new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
-        pageViewByRegionDeserializer.configure(serdeProps, false);
-        final Serde<PageViewByRegion> pageViewByRegionSerde = 
Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
-
-        final KStream<String, PageView> views = 
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), 
pageViewSerde));
+        final KStream<String, PageView> views = 
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new 
JSONSerde<>()));
 
-        final KTable<String, UserProfile> users = 
builder.table("streams-userprofile-input",
-                                                                
Consumed.with(Serdes.String(), userProfileSerde));
+        final KTable<String, UserProfile> users = 
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new 
JSONSerde<>()));
 
         final KStream<WindowedPageViewByRegion, RegionCount> regionCount = 
views
             .leftJoin(users, (view, profile) -> {
@@ -162,7 +206,7 @@ public class PageViewTypedDemo {
                 return viewByRegion;
             })
             .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, 
viewRegion))
-            .groupByKey(Serialized.with(Serdes.String(), 
pageViewByRegionSerde))
+            .groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
             
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
             .count()
             .toStream()
@@ -179,7 +223,7 @@ public class PageViewTypedDemo {
             });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", 
Produced.with(wPageViewByRegionSerde, regionCountSerde));
+        regionCount.to("streams-pageviewstats-typed-output");
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
@@ -197,6 +241,7 @@ public class PageViewTypedDemo {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
+            e.printStackTrace();
             System.exit(1);
         }
         System.exit(0);

Reply via email to