This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d57fe1b MINOR: single Jackson serde for PageViewTypedDemo (#5590)
d57fe1b is described below
commit d57fe1b053546966e6a867d84ee24dd256bb071a
Author: John Roesler <[email protected]>
AuthorDate: Fri Aug 31 15:13:42 2018 -0500
MINOR: single Jackson serde for PageViewTypedDemo (#5590)
Previously, we depicted creating a Jackson serde for every pojo class,
which becomes a burden in practice. There are many ways to avoid this and just
have a single serde, so we've decided to model this design choice instead.
Reviewers: Viktor Somogyi <[email protected]>, Bill Bejeck
<[email protected]>, Guozhang Wang <[email protected]>
---
checkstyle/import-control.xml | 2 +-
gradle/findbugs-exclude.xml | 17 ++
.../examples/pageview/JsonPOJODeserializer.java | 61 -------
.../examples/pageview/JsonPOJOSerializer.java | 55 -------
.../examples/pageview/PageViewTypedDemo.java | 175 +++++++++++++--------
5 files changed, 128 insertions(+), 182 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 840e551..bd5c11f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -216,7 +216,7 @@
<allow pkg="org.apache.kafka.streams"/>
<subpackage name="examples">
- <allow pkg="com.fasterxml.jackson.databind" />
+ <allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.connect.json" />
</subpackage>
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 0998185..62240d8 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -292,4 +292,21 @@ For a detailed description of findbugs bug categories, see
http://findbugs.sourc
</Or>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
</Match>
+
+ <!-- Suppress warnings for unused members that are undetectably used by
Jackson -->
+ <Match>
+ <Package name="org.apache.kafka.streams.examples.pageview"/>
+ <Bug pattern="NP_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+ </Match>
+ <Match>
+ <Package name="org.apache.kafka.streams.examples.pageview"/>
+ <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
+ </Match>
+ <Match>
+ <Package name="org.apache.kafka.streams.examples.pageview"/>
+ <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+ </Match>
+
+ <!-- END Suppress warnings for unused members that are undetectably used
by Jackson -->
+
</FindBugsFilter>
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
deleted file mode 100644
index d55246c..0000000
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
-
-public class JsonPOJODeserializer<T> implements Deserializer<T> {
- private ObjectMapper objectMapper = new ObjectMapper();
-
- private Class<T> tClass;
-
- /**
- * Default constructor needed by Kafka
- */
- public JsonPOJODeserializer() {
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void configure(final Map<String, ?> props, final boolean isKey) {
- tClass = (Class<T>) props.get("JsonPOJOClass");
- }
-
- @Override
- public T deserialize(final String topic, final byte[] bytes) {
- if (bytes == null)
- return null;
-
- final T data;
- try {
- data = objectMapper.readValue(bytes, tClass);
- } catch (final Exception e) {
- throw new SerializationException(e);
- }
-
- return data;
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
deleted file mode 100644
index 81ccf1e..0000000
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
-
-public class JsonPOJOSerializer<T> implements Serializer<T> {
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- /**
- * Default constructor needed by Kafka
- */
- public JsonPOJOSerializer() {
- }
-
- @Override
- public void configure(final Map<String, ?> props, final boolean isKey) {
- }
-
- @Override
- public byte[] serialize(final String topic, final T data) {
- if (data == null)
- return null;
-
- try {
- return objectMapper.writeValueAsBytes(data);
- } catch (final Exception e) {
- throw new SerializationException("Error serializing JSON message",
e);
- }
- }
-
- @Override
- public void close() {
- }
-
-}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 503dbeb..871d836 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -16,23 +16,26 @@
*/
package org.apache.kafka.streams.examples.pageview;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -48,35 +51,122 @@ import java.util.concurrent.TimeUnit;
* is JSON string representing a record in the stream or table, to compute the
number of pageviews per user region.
*
* Before running this example you must create the input topics and the output
topic (e.g. via
- * bin/kafka-topics.sh --create ...), and write some data to the input topics
(e.g. via
- * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving
in the output topic.
+ * bin/kafka-topics --create ...), and write some data to the input topics
(e.g. via
+ * bin/kafka-console-producer). Otherwise you won't see any data arriving in
the output topic.
+ *
+ * The inputs for this example are:
+ * - Topic: streams-pageview-input
+ * Key Format: (String) USER_ID
+ * Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page":
(String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
+ *
+ * - Topic: streams-userprofile-input
+ * Key Format: (String) USER_ID
+ * Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp":
(long ms TIMESTAMP)}
+ *
+ * To observe the results, read the output topic (e.g., via
bin/kafka-console-consumer)
+ * - Topic: streams-pageviewstats-typed-output
+ * Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms
WINDOW_TIMESTAMP), "region": (String REGION)}
+ * Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region":
(String REGION)}
+ *
+ * Note, the "_t" field is necessary to help Jackson identify the correct
class for deserialization in the
+ * generic {@link JSONSerde}. If you instead specify a specific serde per
class, you won't need the extra "_t" field.
*/
+@SuppressWarnings({"WeakerAccess", "unused"})
public class PageViewTypedDemo {
+ /**
+ * A serde for any class that implements {@link JSONSerdeCompatible}. Note
that the classes also need to
+ * be registered in the {@code @JsonSubTypes} annotation on {@link
JSONSerdeCompatible}.
+ *
+ * @param <T> The concrete type of the class that gets de/serialized
+ */
+ public static class JSONSerde<T extends JSONSerdeCompatible> implements
Serializer<T>, Deserializer<T>, Serde<T> {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean
isKey) {}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T deserialize(final String topic, final byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ try {
+ return (T) OBJECT_MAPPER.readValue(data,
JSONSerdeCompatible.class);
+ } catch (final IOException e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final T data) {
+ if (data == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.writeValueAsBytes(data);
+ } catch (final Exception e) {
+ throw new SerializationException("Error serializing JSON
message", e);
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public Serializer<T> serializer() {
+ return this;
+ }
+
+ @Override
+ public Deserializer<T> deserializer() {
+ return this;
+ }
+ }
+
+ /**
+ * An interface for registering types that can be de/serialized with
{@link JSONSerde}.
+ */
+ @SuppressWarnings("DefaultAnnotationParam") // being explicit for the
example
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "_t")
+ @JsonSubTypes({
+ @JsonSubTypes.Type(value = PageView.class, name = "pv"),
+ @JsonSubTypes.Type(value = UserProfile.class, name =
"up"),
+ @JsonSubTypes.Type(value = PageViewByRegion.class, name
= "pvbr"),
+ @JsonSubTypes.Type(value =
WindowedPageViewByRegion.class, name = "wpvbr"),
+ @JsonSubTypes.Type(value = RegionCount.class, name =
"rc")
+ })
+ public interface JSONSerdeCompatible {
+
+ }
+
// POJO classes
- static public class PageView {
+ static public class PageView implements JSONSerdeCompatible {
public String user;
public String page;
public Long timestamp;
}
- static public class UserProfile {
+ static public class UserProfile implements JSONSerdeCompatible {
public String region;
public Long timestamp;
}
- static public class PageViewByRegion {
+ static public class PageViewByRegion implements JSONSerdeCompatible {
public String user;
public String page;
public String region;
}
- static public class WindowedPageViewByRegion {
+ static public class WindowedPageViewByRegion implements
JSONSerdeCompatible {
public long windowStart;
public String region;
}
- static public class RegionCount {
+ static public class RegionCount implements JSONSerdeCompatible {
public long count;
public String region;
}
@@ -86,67 +176,21 @@ public class PageViewTypedDemo {
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
JsonTimestampExtractor.class);
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
JSONSerde.class);
+ props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
JSONSerde.class);
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
JSONSerde.class);
+ props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS,
JSONSerde.class);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final StreamsBuilder builder = new StreamsBuilder();
- // TODO: the following can be removed with a serialization factory
- final Map<String, Object> serdeProps = new HashMap<>();
-
- final Serializer<PageView> pageViewSerializer = new
JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", PageView.class);
- pageViewSerializer.configure(serdeProps, false);
-
- final Deserializer<PageView> pageViewDeserializer = new
JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", PageView.class);
- pageViewDeserializer.configure(serdeProps, false);
-
- final Serde<PageView> pageViewSerde =
Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);
-
- final Serializer<UserProfile> userProfileSerializer = new
JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", UserProfile.class);
- userProfileSerializer.configure(serdeProps, false);
-
- final Deserializer<UserProfile> userProfileDeserializer = new
JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", UserProfile.class);
- userProfileDeserializer.configure(serdeProps, false);
-
- final Serde<UserProfile> userProfileSerde =
Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer);
-
- final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer
= new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
- wPageViewByRegionSerializer.configure(serdeProps, false);
-
- final Deserializer<WindowedPageViewByRegion>
wPageViewByRegionDeserializer = new JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
- wPageViewByRegionDeserializer.configure(serdeProps, false);
-
- final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde =
Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer);
-
- final Serializer<RegionCount> regionCountSerializer = new
JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", RegionCount.class);
- regionCountSerializer.configure(serdeProps, false);
-
- final Deserializer<RegionCount> regionCountDeserializer = new
JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", RegionCount.class);
- regionCountDeserializer.configure(serdeProps, false);
- final Serde<RegionCount> regionCountSerde =
Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
-
- final Serializer<PageViewByRegion> pageViewByRegionSerializer = new
JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
- pageViewByRegionSerializer.configure(serdeProps, false);
- final Deserializer<PageViewByRegion> pageViewByRegionDeserializer =
new JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
- pageViewByRegionDeserializer.configure(serdeProps, false);
- final Serde<PageViewByRegion> pageViewByRegionSerde =
Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
-
- final KStream<String, PageView> views =
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(),
pageViewSerde));
+ final KStream<String, PageView> views =
builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new
JSONSerde<>()));
- final KTable<String, UserProfile> users =
builder.table("streams-userprofile-input",
-
Consumed.with(Serdes.String(), userProfileSerde));
+ final KTable<String, UserProfile> users =
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new
JSONSerde<>()));
final KStream<WindowedPageViewByRegion, RegionCount> regionCount =
views
.leftJoin(users, (view, profile) -> {
@@ -162,7 +206,7 @@ public class PageViewTypedDemo {
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region,
viewRegion))
- .groupByKey(Serialized.with(Serdes.String(),
pageViewByRegionSerde))
+ .groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.count()
.toStream()
@@ -179,7 +223,7 @@ public class PageViewTypedDemo {
});
// write to the result topic
- regionCount.to("streams-pageviewstats-typed-output",
Produced.with(wPageViewByRegionSerde, regionCountSerde));
+ regionCount.to("streams-pageviewstats-typed-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
@@ -197,6 +241,7 @@ public class PageViewTypedDemo {
streams.start();
latch.await();
} catch (final Throwable e) {
+ e.printStackTrace();
System.exit(1);
}
System.exit(0);