Repository: samza-hello-samza Updated Branches: refs/heads/latest 901c3a390 -> e5943a000
SAMZA-1441: Updated High Level API examples in hello-samza to provide serdes in code. Author: Chris Riccomini <[email protected]> Author: Yan Fang <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: Yi Pan (Data Infrastructure) <[email protected]> Author: Aleksandar Pejakovic <[email protected]> Author: Navina Ramesh <[email protected]> Author: vjagadish1989 <[email protected]> Author: Steven Aerts <[email protected]> Author: Chris Riccomini <[email protected]> Author: Manikumar Reddy <[email protected]> Author: Yi Pan <[email protected]> Author: Yan Fang <[email protected]> Author: Xinyu Liu <[email protected]> Author: Stanislav Los <[email protected]> Author: Ken Gidley <[email protected]> Author: Jacob Maes <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #24 from prateekm/serde-instance Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/e5943a00 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/e5943a00 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/e5943a00 Branch: refs/heads/latest Commit: e5943a000eef87e077c422e09dc20f09d4e876ca Parents: 901c3a3 Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 4 16:08:49 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 4 16:08:49 2017 -0700 ---------------------------------------------------------------------- .../config/pageview-adclick-joiner.properties | 18 +--- src/main/config/pageview-filter.properties | 18 +--- src/main/config/pageview-sessionizer.properties | 18 +--- .../config/tumbling-pageview-counter.properties | 20 +--- ...ikipedia-application-local-runner.properties | 2 - .../config/wikipedia-application.properties | 4 +- .../java/samza/examples/cookbook/AdClick.java | 58 ---------- .../java/samza/examples/cookbook/PageView.java | 61 ----------- .../cookbook/PageViewAdClickJoiner.java | 108 +++++++++++-------- .../examples/cookbook/PageViewFilterApp.java | 47 ++++---- .../cookbook/PageViewSessionizerApp.java | 54 +++++----- .../cookbook/TumblingPageViewCounterApp.java | 52 +++++---- .../samza/examples/cookbook/data/AdClick.java | 54 ++++++++++ .../samza/examples/cookbook/data/PageView.java | 46 ++++++++ .../examples/cookbook/data/UserPageViews.java | 51 +++++++++ .../application/WikipediaApplication.java | 98 ++++++++++------- 16 files changed, 371 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-adclick-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties index 81ec3f6..eba7b0b 100644 --- a/src/main/config/pageview-adclick-joiner.properties +++ b/src/main/config/pageview-adclick-joiner.properties @@ -18,29 +18,19 @@ # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-adclick-joiner +job.container.count=2 +job.default.system=kafka +job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewAdClickJoiner -task.inputs=kafka.pageview-join-input,kafka.adclick-join-input task.window.ms=2000 -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory - # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=string -systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -job.coordinator.replication.factor=1 - -job.default.system=kafka -job.container.count=2 +systems.kafka.default.stream.replication.factor=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-filter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties index b9e8d2a..331ee1a 100644 --- a/src/main/config/pageview-filter.properties +++ b/src/main/config/pageview-filter.properties @@ -18,29 +18,19 @@ # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-filter +job.container.count=2 +job.default.system=kafka +job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewFilterApp -task.inputs=kafka.pageview-filter-input task.window.ms=2000 -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory - # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=string -systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -job.coordinator.replication.factor=1 - -job.default.system=kafka -job.container.count=2 +systems.kafka.default.stream.replication.factor=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/pageview-sessionizer.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties index 847aa87..420cdde 100644 --- a/src/main/config/pageview-sessionizer.properties +++ b/src/main/config/pageview-sessionizer.properties @@ -18,29 +18,19 @@ # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-sessionizer +job.container.count=2 +job.default.system=kafka +job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewSessionizerApp -task.inputs=kafka.pageview-session-input task.window.ms=2000 -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory - # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=string -systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -job.coordinator.replication.factor=1 - -job.default.system=kafka -job.container.count=2 +systems.kafka.default.stream.replication.factor=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/tumbling-pageview-counter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties index 09fb131..b58dbe9 100644 --- a/src/main/config/tumbling-pageview-counter.properties +++ b/src/main/config/tumbling-pageview-counter.properties @@ -18,29 +18,19 @@ # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=tumbling-pageview-counter +job.container.count=2 +job.default.system=kafka +job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.TumblingPageViewCounterApp -task.inputs=kafka.pageview-tumbling-input task.window.ms=2000 -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory - # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=string -systems.kafka.samza.key.serde=string -systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -job.coordinator.replication.factor=1 - -job.default.system=kafka -job.container.count=2 +systems.kafka.default.stream.replication.factor=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application-local-runner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties index 1911e68..b770f13 100644 --- a/src/main/config/wikipedia-application-local-runner.properties +++ b/src/main/config/wikipedia-application-local-runner.properties @@ -25,7 +25,6 @@ job.coordinator.zk.connect=localhost:2181 task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory # Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory @@ -39,7 +38,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.default.stream.replication.factor=1 -systems.kafka.default.stream.samza.msg.serde=json # Streams streams.en-wikipedia.samza.system=wikipedia http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/config/wikipedia-application.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties index aeb8069..841fcc5 100644 --- a/src/main/config/wikipedia-application.properties +++ b/src/main/config/wikipedia-application.properties @@ -33,13 +33,11 @@ systems.wikipedia.port=6667 # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.default.stream.replication.factor=1 -systems.kafka.default.stream.samza.msg.serde=json # Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/AdClick.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java deleted file mode 100644 index 2d15cec..0000000 --- a/src/main/java/samza/examples/cookbook/AdClick.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package samza.examples.cookbook; - -/** - * Represents an ad click event. - */ -public class AdClick { - /* - * An unique identifier for the ad - */ - private final String adId; - /** - * The user that clicked the ad - */ - private final String userId; - /** - * The id of the page that the ad was served from - */ - private final String pageId; - - public AdClick(String message) { - String[] adClickFields = message.split(","); - this.adId = adClickFields[0]; - this.userId = adClickFields[1]; - this.pageId = adClickFields[2]; - } - - public String getAdId() { - return adId; - } - - public String getUserId() { - return userId; - } - - public String getPageId() { - return pageId; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageView.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java deleted file mode 100644 index 7803db7..0000000 --- a/src/main/java/samza/examples/cookbook/PageView.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -/** - * Represents a Page view event - */ -class PageView { - /** - * The user that viewed the page - */ - private final String userId; - /** - * The region that the page was viewed from - */ - private final String country; - /** - * A trackingId for the page - */ - private final String pageId; - - /** - * Constructs a {@link PageView} from the provided string. - * - * @param message in the following CSV format - userId,country,url - */ - PageView(String message) { - String[] pageViewFields = message.split(","); - userId = pageViewFields[0]; - country = pageViewFields[1]; - pageId = pageViewFields[2]; - } - - String getUserId() { - return userId; - } - - String getCountry() { - return country; - } - - String getPageId() { - return pageId; - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java index 94c7bc3..4f491f7 100644 --- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java +++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java @@ -20,15 +20,18 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import samza.examples.cookbook.data.AdClick; +import samza.examples.cookbook.data.PageView; import java.time.Duration; -import java.util.function.Function; /** * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for @@ -41,75 +44,94 @@ import java.util.function.Function; * <ol> * <li> * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/> - * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1 * </li> * <li> - * Run the application using the ./bin/run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> - * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties) + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties * </li> * <li> * Produce some messages to the "pageview-join-input" topic <br/> * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/> - * user1,india,google.com <br/> - * user2,china,yahoo.com + * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} * </li> * <li> * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/> * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/> - * adClickId1,user1,google.com <br/> - * adClickId2,user1,yahoo.com + * {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/> + * {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"} * </li> * <li> - * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/> - * --property print.key=true + * Consume messages from the "pageview-adclick-join-output" topic <br/> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true * </li> * </ol> * */ public class PageViewAdClickJoiner implements StreamApplication { - private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class); - private static final String INPUT_TOPIC1 = "pageview-join-input"; - private static final String INPUT_TOPIC2 = "adclick-join-input"; - + private static final String PAGEVIEW_TOPIC = "pageview-join-input"; + private static final String AD_CLICK_TOPIC = "adclick-join-input"; private static final String OUTPUT_TOPIC = "pageview-adclick-join-output"; @Override public void init(StreamGraph graph, Config config) { + StringSerde stringSerde = new StringSerde(); + JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); + JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class); + JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class); + + MessageStream<PageView> pageViews = graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde); + MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICK_TOPIC, adClickSerde); + OutputStream<JoinResult> joinResults = graph.getOutputStream(OUTPUT_TOPIC, joinResultSerde); - MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v); - MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v); + JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction = + new JoinFunction<String, PageView, AdClick, JoinResult>() { + @Override + public JoinResult apply(PageView pageView, AdClick adClick) { + return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId()); + } - OutputStream<String, String, String> outputStream = graph - .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m); + @Override + public String getFirstKey(PageView pageView) { + return pageView.pageId; + } - Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId(); - Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId(); + @Override + public String getSecondKey(AdClick adClick) { + return adClick.getPageId(); + } + }; - MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn); - MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn); + MessageStream<PageView> repartitionedPageViews = + pageViews + .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde)) + .map(KV::getValue); - pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() { + MessageStream<AdClick> repartitionedAdClicks = + adClicks + .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde)) + .map(KV::getValue); - @Override - public String apply(String pageViewMsg, String adClickMsg) { - PageView pageView = new PageView(pageViewMsg); - AdClick adClick = new AdClick(adClickMsg); - String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId()); - return joinResult; - } + repartitionedPageViews + .join(repartitionedAdClicks, pageViewAdClickJoinFunction, + stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3)) + .sendTo(joinResults); + } - @Override - public String getFirstKey(String msg) { - return new PageView(msg).getPageId(); - } + static class JoinResult { + public String pageId; + public String userId; + public String country; + public String adId; - @Override - public String getSecondKey(String msg) { - return new AdClick(msg).getPageId(); - } - }, Duration.ofMinutes(3)).sendTo(outputStream); + public JoinResult(String pageId, String userId, String country, String adId) { + this.pageId = pageId; + this.userId = userId; + this.country = country; + this.adId = adId; + } } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewFilterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java index cb39553..80ce2d1 100644 --- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java +++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java @@ -20,14 +20,14 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.FilterFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.function.Function; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import samza.examples.cookbook.data.PageView; /** * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream. @@ -39,48 +39,41 @@ import java.util.function.Function; * <ol> * <li> * Ensure that the topic "pageview-filter-input" is created <br/> - * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1 * </li> * <li> - * Run the application using the ./bin/run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> - * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties) + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties * </li> * <li> * Produce some messages to the "pageview-filter-input" topic <br/> * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/> - * user1,india,google.com <br/> - * user2,china,yahoo.com + * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> + * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} * </li> * <li> * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/> - * --property print.key=true </li> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true + * </li> * </ol> - * */ public class PageViewFilterApp implements StreamApplication { - private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class); - private static final String FILTER_KEY = "badKey"; private static final String INPUT_TOPIC = "pageview-filter-input"; private static final String OUTPUT_TOPIC = "pageview-filter-output"; + private static final String INVALID_USER_ID = "invalidUserId"; @Override public void init(StreamGraph graph, Config config) { + graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class))); - MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); - - Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId(); - - OutputStream<String, String, String> outputStream = graph - .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m); - - FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId()); + MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); + OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC); pageViews - .partitionBy(keyFn) - .filter(filterFn) - .sendTo(outputStream); + .partitionBy(kv -> kv.value.userId, kv -> kv.value) + .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId)) + .sendTo(filteredPageViews); } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java index 7ec4f9d..f1000ae 100644 --- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java +++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java @@ -20,21 +20,22 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import samza.examples.cookbook.data.PageView; +import samza.examples.cookbook.data.UserPageViews; import java.time.Duration; -import java.util.Collection; -import java.util.function.Function; /** * In this example, we group page views by userId into sessions, and compute the number of page views for each user - * session. A session is considered closed when there is no user activity for a 3 second duration. + * session. A session is considered closed when there is no user activity for a 10 second duration. * * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream. * @@ -43,45 +44,50 @@ import java.util.function.Function; * <ol> * <li> * Ensure that the topic "pageview-session-input" is created <br/> - * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1 * </li> * <li> - * Run the application using the ./bin/run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> - * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties) + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties * </li> * <li> * Produce some messages to the "pageview-session-input" topic <br/> - * user1,india,google.com <br/> - * user2,china,yahoo.com + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"} * </li> * <li> * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/> - * --property print.key=true + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true * </li> * </ol> * */ public class PageViewSessionizerApp implements StreamApplication { - private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class); private static final String INPUT_TOPIC = "pageview-session-input"; private static final String OUTPUT_TOPIC = "pageview-session-output"; @Override public void init(StreamGraph graph, Config config) { + graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class))); - MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); - - OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph - .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); - - Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId(); + MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); + OutputStream<KV<String, UserPageViews>> userPageViews = + graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class))); pageViews - .partitionBy(keyFn) - .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3))) - .sendTo(outputStream); + .partitionBy(kv -> kv.value.userId, kv -> kv.value) + .window(Windows.keyedSessionWindow(kv -> kv.value.userId, Duration.ofSeconds(10))) + .map(windowPane -> { + String userId = windowPane.getKey().getKey(); + int views = windowPane.getMessage().size(); + return KV.of(userId, new UserPageViews(userId, views)); + }) + .sendTo(userPageViews); } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java index 1bc6ff4..0809180 100644 --- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java +++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java @@ -20,18 +20,18 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import samza.examples.cookbook.data.PageView; +import samza.examples.cookbook.data.UserPageViews; import java.time.Duration; -import java.util.Collection; -import java.util.function.Function; /** * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time @@ -46,45 +46,51 @@ import java.util.function.Function; * <ol> * <li> * Ensure that the topic "pageview-tumbling-input" is created <br/> - * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1 * </li> * <li> - * Run the application using the ./bin/run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> - * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties) + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties * </li> * <li> - * Produce some messages to the "pageview-tumbling-input" topic <br/> + * Produce some messages to the "pageview-tumbling-input" topic, waiting for some time between messages <br/> ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/> - user1,india,google.com <br/> - * user2,china,yahoo.com + * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"} * </li> * <li> * Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/> * </li> * </ol> * */ public class TumblingPageViewCounterApp implements StreamApplication { - private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class); private static final String INPUT_TOPIC = "pageview-tumbling-input"; private static final String OUTPUT_TOPIC = "pageview-tumbling-output"; @Override public void init(StreamGraph graph, Config config) { + graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class))); - MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); - - OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph - .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString()); - - Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry(); + MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); + OutputStream<KV<String, UserPageViews>> outputStream = + graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class))); pageViews - .partitionBy(keyFn) - .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1)) + .partitionBy(kv -> kv.value.userId, kv -> kv.value) + .window(Windows.keyedTumblingWindow( + kv -> kv.key, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1)) + .map(windowPane -> { + String userId = windowPane.getKey().getKey(); + int views = windowPane.getMessage(); + return KV.of(userId, new UserPageViews(userId, views)); + }) .sendTo(outputStream); } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/AdClick.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/data/AdClick.java b/src/main/java/samza/examples/cookbook/data/AdClick.java new file mode 100644 index 0000000..42d45dc --- /dev/null +++ b/src/main/java/samza/examples/cookbook/data/AdClick.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.cookbook.data; + +/** + * An ad click event. + */ +public class AdClick { + + private String pageId; // the unique id of the page that the ad was clicked on + private String adId; // an unique id for the ad + private String userId; // the user that clicked the ad + + public String getPageId() { + return pageId; + } + + public void setPageId(String pageId) { + this.pageId = pageId; + } + + public String getAdId() { + return adId; + } + + public void setAdId(String adId) { + this.adId = adId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/PageView.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java new file mode 100644 index 0000000..9640694 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/data/PageView.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook.data; + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * A page view event + */ +public class PageView { + public final String userId; + public final String country; + public final String pageId; + + /** + * Constructs a page view event. + * + * @param pageId the id for the page that was viewed + * @param userId the user that viewed the page + * @param country the country that the page was viewed from + */ + public PageView( + @JsonProperty("pageId") String pageId, + @JsonProperty("userId") String userId, + @JsonProperty("countryId") String country) { + this.userId = userId; + this.country = country; + this.pageId = pageId; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/cookbook/data/UserPageViews.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/data/UserPageViews.java b/src/main/java/samza/examples/cookbook/data/UserPageViews.java new file mode 100644 index 0000000..9e10a14 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/data/UserPageViews.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook.data; + + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * User page view count. + */ +public class UserPageViews { + private final String userId; + private final int count; + + /** + * Constructs a user page view count. + * + * @param userId the id of the user viewing the pages + * @param count number of page views by the user + */ + public UserPageViews( + @JsonProperty("userId") String userId, + @JsonProperty("count") int count) { + this.userId = userId; + this.count = count; + } + + public String getUserId() { + return userId; + } + + public int getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/e5943a00/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java index c320209..736d934 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -20,11 +20,6 @@ package samza.examples.wikipedia.application; import com.google.common.collect.ImmutableList; -import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.metrics.Counter; @@ -34,6 +29,8 @@ import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.TaskContext; import org.slf4j.Logger; @@ -41,6 +38,12 @@ import org.slf4j.LoggerFactory; import samza.examples.wikipedia.model.WikipediaParser; import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + /** * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as @@ -82,46 +85,32 @@ public class WikipediaApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized. + graph.setDefaultSerde(new NoOpSerde<>()); + // Inputs // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent - // They are un-keyed, so the 'k' parameter to the msgBuilder is not used - MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); - MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); - MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); + MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID); + MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID); + MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID); - // Output (also un-keyed, so no keyExtractor) - OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m); + // Output (also un-keyed) + OutputStream<WikipediaStatsOutput> wikipediaStats = + graph.getOutputStream(STATS_STREAM_ID, new JsonSerdeV2<>(WikipediaStatsOutput.class)); // Merge inputs - MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents)); + MessageStream<WikipediaFeedEvent> allWikipediaEvents = + MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents)); // Parse, update stats, prepare output, and send - allWikipediaEvents.map(WikipediaParser::parseEvent) + allWikipediaEvents + .map(WikipediaParser::parseEvent) .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator())) .map(this::formatOutput) .sendTo(wikipediaStats); } /** - * A few statistics about the incoming messages. - */ - private static class WikipediaStats { - // Windowed stats - int edits = 0; - int byteDiff = 0; - Set<String> titles = new HashSet<String>(); - Map<String, Integer> counts = new HashMap<String, Integer>(); - - // Total stats - int totalEdits = 0; - - @Override - public String toString() { - return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts); - } - } - - /** * Updates the windowed and total stats based on each "edit" event. * * Uses a KeyValueStore to persist a total edit count across restarts. @@ -177,17 +166,46 @@ public class WikipediaApplication implements StreamApplication { /** * Format the stats for output to Kafka. */ - private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) { - + private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) { WikipediaStats stats = statsWindowPane.getMessage(); + return new WikipediaStatsOutput( + stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts); + } + + /** + * A few statistics about the incoming messages. + */ + private static class WikipediaStats { + // Windowed stats + int edits = 0; + int byteDiff = 0; + Set<String> titles = new HashSet<>(); + Map<String, Integer> counts = new HashMap<>(); + + // Total stats + int totalEdits = 0; - Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts); - counts.put("edits", stats.edits); - counts.put("edits-all-time", stats.totalEdits); - counts.put("bytes-added", stats.byteDiff); - counts.put("unique-titles", stats.titles.size()); + @Override + public String toString() { + return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts); + } + } - return counts; + static class WikipediaStatsOutput { + public int edits; + public int editsAllTime; + public int bytesAdded; + public int uniqueTitles; + public Map<String, Integer> counts; + + public WikipediaStatsOutput(int edits, int editsAllTime, int bytesAdded, int uniqueTitles, + Map<String, Integer> counts) { + this.edits = edits; + this.editsAllTime = editsAllTime; + this.bytesAdded = bytesAdded; + this.uniqueTitles = uniqueTitles; + this.counts = counts; + } } }
