Repository: samza-hello-samza Updated Branches: refs/heads/latest 4d20c2bde -> c87ed565f
SAMZA-1225: Add demo examples for programming with the fluent API Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c87ed565 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c87ed565 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c87ed565 Branch: refs/heads/latest Commit: c87ed565fbaebf2ac88376143c65e9f52f7a8801 Parents: 4d20c2b Author: vjagadish1989 <[email protected]> Authored: Wed Apr 19 16:52:22 2017 -0700 Committer: vjagadish1989 <[email protected]> Committed: Thu Apr 20 18:09:50 2017 -0700 ---------------------------------------------------------------------- bin/grid | 4 +- gradle.properties | 2 +- pom.xml | 6 +- src/main/assembly/src.xml | 20 ++++ .../config/pageview-adclick-joiner.properties | 46 ++++++++ src/main/config/pageview-filter.properties | 46 ++++++++ src/main/config/pageview-sessionizer.properties | 46 ++++++++ .../config/tumbling-pageview-counter.properties | 46 ++++++++ .../java/samza/examples/cookbook/AdClick.java | 58 ++++++++++ .../java/samza/examples/cookbook/PageView.java | 61 ++++++++++ .../cookbook/PageViewAdClickJoiner.java | 115 +++++++++++++++++++ .../examples/cookbook/PageViewFilterApp.java | 86 ++++++++++++++ .../cookbook/PageViewSessionizerApp.java | 87 ++++++++++++++ .../cookbook/TumblingPageViewCounterApp.java | 90 +++++++++++++++ 14 files changed, 707 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/bin/grid ---------------------------------------------------------------------- diff --git a/bin/grid b/bin/grid index ec9d210..7d2112b 100755 --- a/bin/grid +++ b/bin/grid @@ -35,7 +35,7 @@ DOWNLOAD_CACHE_DIR=$HOME/.samza/download COMMAND=$1 SYSTEM=$2 -DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz +DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz @@ -96,7 +96,7 @@ install_yarn() { install_kafka() { mkdir -p "$DEPLOY_ROOT_DIR" - install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.0.1 + install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1 # have to use SIGTERM since nohup on appears to ignore SIGINT # and Kafka switched to SIGINT in KAFKA-1031. sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index 1bc7633..294875b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ */ SAMZA_VERSION=0.13.0-SNAPSHOT -KAFKA_VERSION=0.10.0.1 +KAFKA_VERSION=0.10.1.1 HADOOP_VERSION=2.6.1 SLF4J_VERSION = 1.7.7 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 100b2b1..9a0b54e 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,7 @@ under the License. <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> - <version>0.10.0.1</version> + <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.schwering</groupId> @@ -240,8 +240,8 @@ under the License. <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index 3f2e4a8..e280a9a 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -51,6 +51,26 @@ <outputDirectory>config</outputDirectory> <filtered>true</filtered> </file> + <file> + <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> + <source>${basedir}/src/main/config/pageview-sessionizer.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> + <source>${basedir}/src/main/config/pageview-filter.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> + <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> </files> <dependencySets> <dependencySet> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-adclick-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties new file mode 100644 index 0000000..81ec3f6 --- /dev/null +++ b/src/main/config/pageview-adclick-joiner.properties @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-adclick-joiner + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.PageViewAdClickJoiner +task.inputs=kafka.pageview-join-input,kafka.adclick-join-input +task.window.ms=2000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.samza.key.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181 +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 + +job.default.system=kafka +job.container.count=2 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-filter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties new file mode 100644 index 0000000..b9e8d2a --- /dev/null +++ b/src/main/config/pageview-filter.properties @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-filter + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.PageViewFilterApp +task.inputs=kafka.pageview-filter-input +task.window.ms=2000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.samza.key.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181 +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 + +job.default.system=kafka +job.container.count=2 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/pageview-sessionizer.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties new file mode 100644 index 0000000..847aa87 --- /dev/null +++ b/src/main/config/pageview-sessionizer.properties @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-sessionizer + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.PageViewSessionizerApp +task.inputs=kafka.pageview-session-input +task.window.ms=2000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.samza.key.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181 +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 + +job.default.system=kafka +job.container.count=2 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/config/tumbling-pageview-counter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties new file mode 100644 index 0000000..09fb131 --- /dev/null +++ b/src/main/config/tumbling-pageview-counter.properties @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=tumbling-pageview-counter + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.TumblingPageViewCounterApp +task.inputs=kafka.pageview-tumbling-input +task.window.ms=2000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.samza.key.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 + +job.default.system=kafka +job.container.count=2 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/AdClick.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java new file mode 100644 index 0000000..2d15cec --- /dev/null +++ b/src/main/java/samza/examples/cookbook/AdClick.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.cookbook; + +/** + * Represents an ad click event. + */ +public class AdClick { + /* + * An unique identifier for the ad + */ + private final String adId; + /** + * The user that clicked the ad + */ + private final String userId; + /** + * The id of the page that the ad was served from + */ + private final String pageId; + + public AdClick(String message) { + String[] adClickFields = message.split(","); + this.adId = adClickFields[0]; + this.userId = adClickFields[1]; + this.pageId = adClickFields[2]; + } + + public String getAdId() { + return adId; + } + + public String getUserId() { + return userId; + } + + public String getPageId() { + return pageId; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageView.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java new file mode 100644 index 0000000..7803db7 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/PageView.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +/** + * Represents a Page view event + */ +class PageView { + /** + * The user that viewed the page + */ + private final String userId; + /** + * The region that the page was viewed from + */ + private final String country; + /** + * A trackingId for the page + */ + private final String pageId; + + /** + * Constructs a {@link PageView} from the provided string. + * + * @param message in the following CSV format - userId,country,url + */ + PageView(String message) { + String[] pageViewFields = message.split(","); + userId = pageViewFields[0]; + country = pageViewFields[1]; + pageId = pageViewFields[2]; + } + + String getUserId() { + return userId; + } + + String getCountry() { + return country; + } + + String getPageId() { + return pageId; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java new file mode 100644 index 0000000..94c7bc3 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.JoinFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.function.Function; + +/** + * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for + * analysis on what pages served an Ad that was clicked. + * + * <p> Concepts covered: Performing stream to stream Joins. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/> + * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the ./bin/run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> + * --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties) + * </li> + * <li> + * Produce some messages to the "pageview-join-input" topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/> + * user1,india,google.com <br/> + * user2,china,yahoo.com + * </li> + * <li> + * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/> + * adClickId1,user1,google.com <br/> + * adClickId2,user1,yahoo.com + * </li> + * <li> + * Consume messages from the "pageview-adclick-join-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output <br/> + * --property print.key=true + * </li> + * </ol> + * + */ +public class PageViewAdClickJoiner implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(PageViewAdClickJoiner.class); + private static final String INPUT_TOPIC1 = "pageview-join-input"; + private static final String INPUT_TOPIC2 = "adclick-join-input"; + + private static final String OUTPUT_TOPIC = "pageview-adclick-join-output"; + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC1, (k, v) -> v); + MessageStream<String> adClicks = graph.<String, String, String>getInputStream(INPUT_TOPIC2, (k, v) -> v); + + OutputStream<String, String, String> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, m -> "", m -> m); + + Function<String, String> pageViewKeyFn = pageView -> new PageView(pageView).getPageId(); + Function<String, String> adClickKeyFn = adClick -> new AdClick(adClick).getPageId(); + + MessageStream<String> pageViewRepartitioned = pageViews.partitionBy(pageViewKeyFn); + MessageStream<String> adClickRepartitioned = adClicks.partitionBy(adClickKeyFn); + + pageViewRepartitioned.join(adClickRepartitioned, new JoinFunction<String, String, String, String>() { + + @Override + public String apply(String pageViewMsg, String adClickMsg) { + PageView pageView = new PageView(pageViewMsg); + AdClick adClick = new AdClick(adClickMsg); + String joinResult = String.format("%s,%s,%s", pageView.getPageId(), pageView.getCountry(), adClick.getAdId()); + return joinResult; + } + + @Override + public String getFirstKey(String msg) { + return new PageView(msg).getPageId(); + } + + @Override + public String getSecondKey(String msg) { + return new AdClick(msg).getPageId(); + } + }, Duration.ofMinutes(3)).sendTo(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewFilterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java new file mode 100644 index 0000000..cb39553 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FilterFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Function; + +/** + * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream. + * + * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "pageview-filter-input" is created <br/> + * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the ./bin/run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> + * --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties) + * </li> + * <li> + * Produce some messages to the "pageview-filter-input" topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/> + * user1,india,google.com <br/> + * user2,china,yahoo.com + * </li> + * <li> + * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output <br/> + * --property print.key=true </li> + * </ol> + * + */ +public class PageViewFilterApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(PageViewFilterApp.class); + private static final String FILTER_KEY = "badKey"; + private static final String INPUT_TOPIC = "pageview-filter-input"; + private static final String OUTPUT_TOPIC = "pageview-filter-output"; + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); + + Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId(); + + OutputStream<String, String, String> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, keyFn, m -> m); + + FilterFunction<String> filterFn = pageView -> !FILTER_KEY.equals(new PageView(pageView).getUserId()); + + pageViews + .partitionBy(keyFn) + .filter(filterFn) + .sendTo(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java new file mode 100644 index 0000000..7ec4f9d --- /dev/null +++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.function.Function; + +/** + * In this example, we group page views by userId into sessions, and compute the number of page views for each user + * session. A session is considered closed when there is no user activity for a 3 second duration. + * + * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "pageview-session-input" is created <br/> + * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the ./bin/run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> + * --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties) + * </li> + * <li> + * Produce some messages to the "pageview-session-input" topic <br/> + * user1,india,google.com <br/> + * user2,china,yahoo.com + * </li> + * <li> + * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-sessions-output <br/> + * --property print.key=true + * </li> + * </ol> + * + */ +public class PageViewSessionizerApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(PageViewSessionizerApp.class); + private static final String INPUT_TOPIC = "pageview-session-input"; + private static final String OUTPUT_TOPIC = "pageview-session-output"; + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); + + OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); + + Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId(); + + pageViews + .partitionBy(keyFn) + .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3))) + .sendTo(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c87ed565/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java new file mode 100644 index 0000000..1bc6ff4 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.function.Function; + +/** + * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time + * window. + * + * <p> Concepts covered: Performing Group-By style aggregations on tumbling time windows. + * + * <p> Tumbling windows divide a stream into a set of contiguous, fixed-sized, non-overlapping time intervals. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "pageview-tumbling-input" is created <br/> + * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-tumbling-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the ./bin/run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> + * --config-path=file://$PWD/deploy/samza/config/tumbling-pageview-counter.properties) + * </li> + * <li> + * Produce some messages to the "pageview-tumbling-input" topic <br/> + ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-tumbling-input --broker-list localhost:9092 <br/> + user1,india,google.com <br/> + * user2,china,yahoo.com + * </li> + * <li> + * Consume messages from the "pageview-tumbling-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-tumbling-output --property print.key=true <br/> + * </li> + * </ol> + * + */ +public class TumblingPageViewCounterApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(TumblingPageViewCounterApp.class); + private static final String INPUT_TOPIC = "pageview-tumbling-input"; + private static final String OUTPUT_TOPIC = "pageview-tumbling-output"; + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<String> pageViews = graph.<String, String, String>getInputStream(INPUT_TOPIC, (k, v) -> v); + + OutputStream<String, String, WindowPane<String, Integer>> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> m.getMessage().toString()); + + Function<String, String> keyFn = pageView -> new PageView(pageView).getCountry(); + + pageViews + .partitionBy(keyFn) + .window(Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(3), () -> 0, (m, prevCount) -> prevCount + 1)) + .sendTo(outputStream); + } +}
