Merge latest with master
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/c838b97b Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/c838b97b Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/c838b97b Branch: refs/heads/master Commit: c838b97b36fab603ff183b6c77bf17b974b9cfb5 Parents: 8290590 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Thu Dec 6 19:23:30 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Dec 6 19:26:47 2018 -0800 ---------------------------------------------------------------------- src/main/config/pageview-filter-sql.properties | 51 -------------- .../examples/cookbook/PageViewFilterSqlApp.java | 72 -------------------- src/main/schemas/OutputTopic.avsc | 39 ----------- src/main/schemas/PageViewStream.avsc | 32 --------- 4 files changed, 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c838b97b/src/main/config/pageview-filter-sql.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter-sql.properties b/src/main/config/pageview-filter-sql.properties deleted file mode 100644 index 49a4271..0000000 --- a/src/main/config/pageview-filter-sql.properties +++ /dev/null @@ -1,51 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=pageview-filter - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.PageViewFilterSqlApp -app.runner.class=org.apache.samza.sql.runner.SamzaSqlApplicationRunner - -# Avro schema files used in the sql command. -schema.files=file://${basedir}/src/main/schemas/OutputTopic.avsc,file://${basedir}/src/main/schemas/PageViewStream.avsc - -# Samza sql configs -samza.sql.stmt=insert into kafka.ouputTopic select id, Name from PageViewStream - -# Serializers -serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory -serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.msg.serde=avro -systems.kafka.samza.key.serde=string -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 - -# Job Coordinator -job.coordinator.system=kafka -job.coordinator.replication.factor=1 - -job.default.system=kafka -job.container.count=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c838b97b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java deleted file mode 100644 index de01969..0000000 --- a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java +++ /dev/null @@ -1,72 +0,0 @@ -package samza.examples.cookbook; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import org.apache.avro.Schema; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.sql.runner.SamzaSqlApplication; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; - - -/** - * In this example, we demonstrate how to use SQL to create a samza job. - * - * <p>Concepts covered: Using sql to perform Stream processing. - * - * To run the below example: - * - * <ol> - * <li> - * Ensure that the topic "PageViewStream" is created <br/> - * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1 - * </li> - * <li> - * Run the application using the ./bin/run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> - * --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties) - * </li> - * <li> - * Produce some messages to the "PageViewStream" topic <br/> - * Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/> - * </li> - * <li> - * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/> - * --property print.key=true </li> - * </ol> - * - */ - -public class PageViewFilterSqlApp extends SamzaSqlApplication { - - public static final String CFG_SCHEMA_FILES = "schema.files"; - private static final String CFG_SCHEMA_VALUE_FMT = ""; - - @Override - public void init(StreamGraph streamGraph, Config config) { - String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream"; - String schemaFiles = config.get(CFG_SCHEMA_FILES); - HashMap<String, String> newConfig = new HashMap<>(); - newConfig.putAll(config); - populateSchemaConfigs(schemaFiles, newConfig); - newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt); - super.init(streamGraph, new MapConfig(newConfig)); - } - - private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) { - String[] schemaFiles = schemaFilesValue.split(","); - for (String schemaFileValue : schemaFiles) { - try { - File schemaFile = new File(schemaFileValue); - String schemaValue = Schema.parse(schemaFile).toString(); - config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue); - } catch (IOException e) { - throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c838b97b/src/main/schemas/OutputTopic.avsc ---------------------------------------------------------------------- diff --git a/src/main/schemas/OutputTopic.avsc b/src/main/schemas/OutputTopic.avsc deleted file mode 100644 index 7670b1b..0000000 --- a/src/main/schemas/OutputTopic.avsc +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -{ - "name": "SimpleRecord", - "version" : 1, - "namespace": "org.apache.samza.sql.system.avro", - "type": "record", - "fields": [ - { - "name": "id", - "doc": "Record id.", - "type": ["null", "int"], - "default":null - }, - { - "name": "Name", - "doc" : "Some name.", - "type": ["null", "string"], - "default":null - } - ] -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/c838b97b/src/main/schemas/PageViewStream.avsc ---------------------------------------------------------------------- diff --git a/src/main/schemas/PageViewStream.avsc b/src/main/schemas/PageViewStream.avsc deleted file mode 100644 index 54936f7..0000000 --- a/src/main/schemas/PageViewStream.avsc +++ /dev/null @@ -1,32 +0,0 @@ -{ - "name": "PageViewEvent", - "version" : 1, - "namespace": "com.linkedin.samza.tools.avro", - "type": "record", - "fields": [ - { - "name": "id", - "doc": "Record id.", - "type": ["null", "int"], - "default":null - }, - { - "name": "Name", - "doc": "Name of the profile.", - "type": ["null", "string"], - "default":null - }, - { - "name": "ViewerName", - "doc": "Name of the person who viewed the profile.", - "type": ["null", "string"], - "default":null - }, - { - "name": "ProfileViewTimestamp", - "doc": "Time at which the profile was viewed.", - "type": ["null", "long"], - "default":null - } - ] -}