Upgrading to samza 1.0 * Adding WikipediaStatsStreamTaskApplication, WikipediaParserStreamTaskApplication, WikipediaFeedStreamTaskApplication * Removing task.inputs, and task.class from config.
* Adding app.class. Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org> Closes #43 from rmatharu/upgrading Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/108b6d52 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/108b6d52 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/108b6d52 Branch: refs/heads/master Commit: 108b6d528ea7f70514f84ff8c05fd7ecd7bc3203 Parents: a096c75 Author: Ray Matharu <rmath...@linkedin.com> Authored: Wed Oct 24 17:58:10 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Wed Oct 24 17:58:10 2018 -0700 ---------------------------------------------------------------------- bin/deploy.sh | 2 +- build.gradle | 2 +- gradle.properties | 2 +- pom.xml | 9 +- src/main/config/wikipedia-feed.properties | 24 +----- src/main/config/wikipedia-parser.properties | 22 +---- src/main/config/wikipedia-stats.properties | 23 ++---- .../examples/azure/AzureZKLocalApplication.java | 2 +- .../application/WikipediaApplication.java | 16 +++- .../WikipediaFeedTaskApplication.java | 87 ++++++++++++++++++++ .../WikipediaParserTaskApplication.java | 72 ++++++++++++++++ .../WikipediaStatsTaskApplication.java | 68 +++++++++++++++ 12 files changed, 262 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/bin/deploy.sh ---------------------------------------------------------------------- diff --git a/bin/deploy.sh b/bin/deploy.sh index 3c3ada2..08c0601 100755 --- a/bin/deploy.sh +++ b/bin/deploy.sh @@ -23,4 +23,4 @@ base_dir=`pwd` mvn clean package mkdir -p $base_dir/deploy/samza -tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza +tar -xvf $base_dir/target/hello-samza-1.0.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 6201bc5..a897372 100644 --- a/build.gradle +++ b/build.gradle @@ -50,7 +50,7 @@ dependencies { compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION") - + compile(group: 'org.apache.samza', name: 'samza-azure', version: "$SAMZA_VERSION") explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index 34a540a..bfc8582 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ * under the License. */ -SAMZA_VERSION=1.0.0-SNAPSHOT +SAMZA_VERSION=1.0.1 KAFKA_VERSION=0.11.0.2 HADOOP_VERSION=2.6.1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8659683..167ea1e 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ under the License. <groupId>org.apache.samza</groupId> <artifactId>hello-samza</artifactId> - <version>1.0.0-SNAPSHOT</version> + <version>1.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>Samza Example</name> <description> @@ -123,6 +123,11 @@ under the License. <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.1</version> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> @@ -153,7 +158,7 @@ under the License. <properties> <!-- maven specific properties --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <samza.version>1.0.0-SNAPSHOT</samza.version> + <samza.version>1.0.1-SNAPSHOT</samza.version> <hadoop.version>2.6.1</hadoop.version> </properties> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-feed.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-feed.properties b/src/main/config/wikipedia-feed.properties index 180d749..9fee678 100644 --- a/src/main/config/wikipedia-feed.properties +++ b/src/main/config/wikipedia-feed.properties @@ -19,29 +19,11 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=wikipedia-feed -# YARN +# YARN package path yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz -# Task -task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask -task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews +# TaskApplication class +app.class=samza.examples.wikipedia.task.application.WikipediaFeedTaskApplication -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory - -# Wikipedia System -systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory -systems.wikipedia.host=irc.wikimedia.org -systems.wikipedia.port=6667 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka # Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model # See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details -job.coordinator.replication.factor=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-parser.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties index e8f3fa0..d9614c2 100644 --- a/src/main/config/wikipedia-parser.properties +++ b/src/main/config/wikipedia-parser.properties @@ -22,23 +22,5 @@ job.name=wikipedia-parser # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz -# Task -task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask -task.inputs=kafka.wikipedia-raw - -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory - -# Systems -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.streams.metrics.samza.msg.serde=metrics -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.consumer.auto.offset.reset=largest -systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -# Normally, this would be 3, but we have only one broker. -job.coordinator.replication.factor=1 +# TaskApplication class +app.class=samza.examples.wikipedia.task.application.WikipediaParserTaskApplication \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/config/wikipedia-stats.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties index 0a1cf31..7da456b 100644 --- a/src/main/config/wikipedia-stats.properties +++ b/src/main/config/wikipedia-stats.properties @@ -19,12 +19,13 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=wikipedia-stats -# YARN +# YARN package path yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz -# Task -task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask -task.inputs=kafka.wikipedia-edits +# TaskApplication class +app.class=samza.examples.wikipedia.task.application.WikipediaStatsTaskApplication + +# Setting the window frequency in milliseconds task.window.ms=10000 # Metrics @@ -33,17 +34,10 @@ metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapsho metrics.reporter.snapshot.stream=kafka.metrics metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +# Serializers (used below in specifying the stores' serdes) serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory -# Systems -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=json -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.consumer.auto.offset.reset=largest -systems.kafka.producer.bootstrap.servers=localhost:9092 # Key-value storage stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory @@ -57,8 +51,3 @@ stores.wikipedia-stats.changelog.replication.factor=1 # Normally, we'd set this much higher, but we want things to look snappy in the demo. stores.wikipedia-stats.write.batch.size=0 stores.wikipedia-stats.object.cache.size=0 - -# Job Coordinator -job.coordinator.system=kafka -# Normally, this would be 3, but we have only one broker. -job.coordinator.replication.factor=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/azure/AzureZKLocalApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java index 01075e2..462e389 100644 --- a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java +++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java @@ -23,7 +23,7 @@ import joptsimple.OptionSet; import org.apache.samza.config.Config; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.util.CommandLine; -import samza.examples.azure.AzureApplication; + public class AzureZKLocalApplication { http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java index 60bbe15..9077480 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -87,7 +87,11 @@ public class WikipediaApplication implements StreamApplication, Serializable { @Override public void describe(StreamApplicationDescriptor appDescriptor) { + + // Define a SystemDescriptor for Wikipedia data WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667); + + // Define InputDescriptors for consuming wikipedia data WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor .getInputDescriptor("en-wikipedia") .withChannel("#en.wikipedia"); @@ -98,14 +102,19 @@ public class WikipediaApplication implements StreamApplication, Serializable { .getInputDescriptor("en-wikinews") .withChannel("#en.wikinews"); + // Define a system descriptor for Kafka KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + // Define an output descriptor KafkaOutputDescriptor<WikipediaStatsOutput> statsOutputDescriptor = kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerdeV2<>(WikipediaStatsOutput.class)); + + // Set the default system descriptor to Kafka, so that it is used for all + // internal resources, e.g., kafka topic for checkpointing, coordinator stream. appDescriptor.withDefaultSystem(kafkaSystemDescriptor); MessageStream<WikipediaFeedEvent> wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor); MessageStream<WikipediaFeedEvent> wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor); @@ -155,7 +164,9 @@ public class WikipediaApplication implements StreamApplication, Serializable { // Update persisted total Integer editsAllTime = store.get(EDIT_COUNT_KEY); - if (editsAllTime == null) editsAllTime = 0; + if (editsAllTime == null) { + editsAllTime = 0; + } editsAllTime++; store.put(EDIT_COUNT_KEY, editsAllTime); @@ -185,8 +196,7 @@ public class WikipediaApplication implements StreamApplication, Serializable { */ private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) { WikipediaStats stats = statsWindowPane.getMessage(); - return new WikipediaStatsOutput( - stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts); + return new WikipediaStatsOutput(stats.edits, stats.totalEdits, stats.byteDiff, stats.titles.size(), stats.counts); } /** http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java new file mode 100644 index 0000000..12d29b0 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaFeedTaskApplication.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.wikipedia.task.application; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.descriptors.TaskApplicationDescriptor; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; +import org.apache.samza.task.StreamTaskFactory; +import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor; +import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor; +import samza.examples.wikipedia.task.WikipediaFeedStreamTask; + + +/** + * This TaskApplication is responsible for consuming data from wikipedia, wiktionary, and wikinews data sources, and + * merging them into a single Kafka topic called wikipedia-raw. + * + * + */ +public class WikipediaFeedTaskApplication implements TaskApplication { + + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + @Override + public void describe(TaskApplicationDescriptor taskApplicationDescriptor) { + + // Define a SystemDescriptor for Wikipedia data + WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667); + + // Define InputDescriptors for consuming wikipedia data + WikipediaInputDescriptor wikipediaInputDescriptor = + wikipediaSystemDescriptor.getInputDescriptor("en-wikipedia").withChannel("#en.wikipedia"); + WikipediaInputDescriptor wiktionaryInputDescriptor = + wikipediaSystemDescriptor.getInputDescriptor("en-wiktionary").withChannel("#en.wiktionary"); + WikipediaInputDescriptor wikiNewsInputDescriptor = + wikipediaSystemDescriptor.getInputDescriptor("en-wikinews").withChannel("#en.wikinews"); + + // Define a system descriptor for Kafka, which is our output system + KafkaSystemDescriptor kafkaSystemDescriptor = + new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + // Define an output descriptor + KafkaOutputDescriptor kafkaOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor("wikipedia-raw", new JsonSerde<>()); + + // Set the default system descriptor to Kafka, so that it is used for all + // internal resources, e.g., kafka topic for checkpointing, coordinator stream. + taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + // Set the inputs + taskApplicationDescriptor.withInputStream(wikipediaInputDescriptor); + taskApplicationDescriptor.withInputStream(wiktionaryInputDescriptor); + taskApplicationDescriptor.withInputStream(wikiNewsInputDescriptor); + + // Set the output + taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor); + + // Set the task factory + taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaFeedStreamTask()); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java new file mode 100644 index 0000000..5b6275b --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.wikipedia.task.application; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.descriptors.TaskApplicationDescriptor; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; +import org.apache.samza.task.StreamTaskFactory; +import samza.examples.wikipedia.task.WikipediaParserStreamTask; + + +public class WikipediaParserTaskApplication implements TaskApplication { + + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + @Override + public void describe(TaskApplicationDescriptor taskApplicationDescriptor) { + + // Define a system descriptor for Kafka, which is both our input and output system + KafkaSystemDescriptor kafkaSystemDescriptor = + new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + // Input descriptor for the wikipedia-raw topic + KafkaInputDescriptor kafkaInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor("wikipedia-raw", new JsonSerde<>()); + + // Output descriptor for the wikipedia-edits topic + KafkaOutputDescriptor kafkaOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor("wikipedia-edits", new JsonSerde<>()); + + // Set the default system descriptor to Kafka, so that it is used for all + // internal resources, e.g., kafka topic for checkpointing, coordinator stream. + taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + // Set the input + taskApplicationDescriptor.withInputStream(kafkaInputDescriptor); + + // Set the output + taskApplicationDescriptor.withOutputStream(kafkaOutputDescriptor); + + // Set the task factory + taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaParserStreamTask()); + } +} + http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/108b6d52/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java new file mode 100644 index 0000000..68ecf4a --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/application/WikipediaStatsTaskApplication.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.wikipedia.task.application; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.descriptors.TaskApplicationDescriptor; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; +import org.apache.samza.task.StreamTaskFactory; +import samza.examples.wikipedia.task.WikipediaStatsStreamTask; + + +public class WikipediaStatsTaskApplication implements TaskApplication { + + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + @Override + public void describe(TaskApplicationDescriptor taskApplicationDescriptor) { + + // Define a system descriptor for Kafka + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + // Input descriptor for the wikipedia-edits topic + KafkaInputDescriptor kafkaInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor("wikipedia-edits", new JsonSerde<>()); + + // Set the default system descriptor to Kafka, so that it is used for all + // internal resources, e.g., kafka topic for checkpointing, coordinator stream. + taskApplicationDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + // Set the input + taskApplicationDescriptor.withInputStream(kafkaInputDescriptor); + + // Set the output + taskApplicationDescriptor.withOutputStream( + kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerde<>())); + + // Set the task factory + taskApplicationDescriptor.withTaskFactory((StreamTaskFactory) () -> new WikipediaStatsStreamTask()); + } +} +