Repository: samza-hello-samza Updated Branches: refs/heads/master f48892747 -> 428f61393
Adding samza sql application Samza SQL application. Author: Srinivasulu Punuru <[email protected]> Reviewers: Aditya Toomula <[email protected]> Closes #41 from srinipunuru/sql.1 Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/428f6139 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/428f6139 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/428f6139 Branch: refs/heads/master Commit: 428f6139338a5967e90803b4bd2ac58883efef4e Parents: f488927 Author: Srinivasulu Punuru <[email protected]> Authored: Tue Oct 23 10:28:48 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Tue Oct 23 10:28:48 2018 -0700 ---------------------------------------------------------------------- build.gradle | 2 + src/main/config/pageview-filter-sql.properties | 51 ++++++++++++++ .../samza/examples/avro/AvroSerDeFactory.java | 73 ++++++++++++++++++++ .../examples/cookbook/PageViewFilterSqlApp.java | 72 +++++++++++++++++++ src/main/schemas/OutputTopic.avsc | 39 +++++++++++ src/main/schemas/PageViewStream.avsc | 32 +++++++++ 6 files changed, 269 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 21793c4..cce7699 100644 --- a/build.gradle +++ b/build.gradle @@ -49,11 +49,13 @@ dependencies { compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION") + compile(group: 'org.apache.samza', name: 'samza-sql', version: "$SAMZA_VERSION") explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION") + runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/config/pageview-filter-sql.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter-sql.properties b/src/main/config/pageview-filter-sql.properties new file mode 100644 index 0000000..49a4271 --- /dev/null +++ b/src/main/config/pageview-filter-sql.properties @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-filter + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.PageViewFilterSqlApp +app.runner.class=org.apache.samza.sql.runner.SamzaSqlApplicationRunner + +# Avro schema files used in the sql command. +schema.files=file://${basedir}/src/main/schemas/OutputTopic.avsc,file://${basedir}/src/main/schemas/PageViewStream.avsc + +# Samza sql configs +samza.sql.stmt=insert into kafka.ouputTopic select id, Name from PageViewStream + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=avro +systems.kafka.samza.key.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181 +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 + +job.default.system=kafka +job.container.count=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/avro/AvroSerDeFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java new file mode 100644 index 0000000..c96eedd --- /dev/null +++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java @@ -0,0 +1,73 @@ +package samza.examples.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; + + +public class AvroSerDeFactory implements SerdeFactory { + + public static String CFG_AVRO_SCHEMA = "serializers.avro.schema"; + + @Override + public Serde getSerde(String name, Config config) { + return new AvroSerDe(config); + } + + private class AvroSerDe implements Serde { + private final Schema schema; + + public AvroSerDe(Config config) { + schema = Schema.parse(config.get(CFG_AVRO_SCHEMA)); + } + + @Override + public Object fromBytes(byte[] bytes) { + GenericRecord record; + try { + record = genericRecordFromBytes(bytes, schema); + } catch (IOException e) { + throw new SamzaException("Unable to deserialize the record", e); + } + return record; + } + + @Override + public byte[] toBytes(Object o) { + GenericRecord record = (GenericRecord) o; + try { + return encodeAvroGenericRecord(schema, record); + } catch (IOException e) { + throw new SamzaException("Unable to serialize the record", e); + } + } + } + + public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException { + DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(os, null); + msgDatumWriter.write(record, encoder); + encoder.flush(); + return os.toByteArray(); + } + + private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException { + BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null); + GenericDatumReader<T> reader = new GenericDatumReader<>(schema); + return reader.read(null, binDecoder); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java new file mode 100644 index 0000000..de01969 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java @@ -0,0 +1,72 @@ +package samza.examples.cookbook; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import org.apache.avro.Schema; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.sql.runner.SamzaSqlApplication; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; + + +/** + * In this example, we demonstrate how to use SQL to create a samza job. + * + * <p>Concepts covered: Using sql to perform Stream processing. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "PageViewStream" is created <br/> + * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1 + * </li> + * <li> + * Run the application using the ./bin/run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/> + * --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties) + * </li> + * <li> + * Produce some messages to the "PageViewStream" topic <br/> + * Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/> + * </li> + * <li> + * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/> + * --property print.key=true </li> + * </ol> + * + */ + +public class PageViewFilterSqlApp extends SamzaSqlApplication { + + public static final String CFG_SCHEMA_FILES = "schema.files"; + private static final String CFG_SCHEMA_VALUE_FMT = ""; + + @Override + public void init(StreamGraph streamGraph, Config config) { + String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream"; + String schemaFiles = config.get(CFG_SCHEMA_FILES); + HashMap<String, String> newConfig = new HashMap<>(); + newConfig.putAll(config); + populateSchemaConfigs(schemaFiles, newConfig); + newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt); + super.init(streamGraph, new MapConfig(newConfig)); + } + + private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) { + String[] schemaFiles = schemaFilesValue.split(","); + for (String schemaFileValue : schemaFiles) { + try { + File schemaFile = new File(schemaFileValue); + String schemaValue = Schema.parse(schemaFile).toString(); + config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue); + } catch (IOException e) { + throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/OutputTopic.avsc ---------------------------------------------------------------------- diff --git a/src/main/schemas/OutputTopic.avsc b/src/main/schemas/OutputTopic.avsc new file mode 100644 index 0000000..7670b1b --- /dev/null +++ b/src/main/schemas/OutputTopic.avsc @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "SimpleRecord", + "version" : 1, + "namespace": "org.apache.samza.sql.system.avro", + "type": "record", + "fields": [ + { + "name": "id", + "doc": "Record id.", + "type": ["null", "int"], + "default":null + }, + { + "name": "Name", + "doc" : "Some name.", + "type": ["null", "string"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/PageViewStream.avsc ---------------------------------------------------------------------- diff --git a/src/main/schemas/PageViewStream.avsc b/src/main/schemas/PageViewStream.avsc new file mode 100644 index 0000000..54936f7 --- /dev/null +++ b/src/main/schemas/PageViewStream.avsc @@ -0,0 +1,32 @@ +{ + "name": "PageViewEvent", + "version" : 1, + "namespace": "com.linkedin.samza.tools.avro", + "type": "record", + "fields": [ + { + "name": "id", + "doc": "Record id.", + "type": ["null", "int"], + "default":null + }, + { + "name": "Name", + "doc": "Name of the profile.", + "type": ["null", "string"], + "default":null + }, + { + "name": "ViewerName", + "doc": "Name of the person who viewed the profile.", + "type": ["null", "string"], + "default":null + }, + { + "name": "ProfileViewTimestamp", + "doc": "Time at which the profile was viewed.", + "type": ["null", "long"], + "default":null + } + ] +}
