Documentation for Samza SQL

**Samza tools** :
Contains the following tools that can be used for playing with Samza sql or any 
other samza job.

1. Generate kafka events : Tool used to generate avro serialized kafka events
2. Event hub consumer : Tool used to consume events from event hubs topic. This 
can be used if the samza job writes events to event hubs.
3. Samza sql console : Tool used to execute SQL using samza sql.

Adds documentation on how to use Samza SQL on a local machine and on a yarn 
environment and their associated Samza tooling.

https://issues.apache.org/jira/browse/SAMZA-1526

Author: Srinivasulu Punuru <[email protected]>

Reviewers: Yi Pan<[email protected]>, Jagadish<[email protected]>

Closes #374 from srinipunuru/docs.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/804aea2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/804aea2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/804aea2b

Branch: refs/heads/0.14.0
Commit: 804aea2b957fbd30a2b79d5a07fbb168e2871244
Parents: ef506a3
Author: Srinivasulu Punuru <[email protected]>
Authored: Tue Dec 12 10:46:01 2017 -0800
Committer: xiliu <[email protected]>
Committed: Fri Dec 22 10:48:45 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  28 +++
 docs/README.md                                  |   4 +-
 docs/learn/tutorials/versioned/index.md         |   3 +
 docs/learn/tutorials/versioned/samza-sql.md     | 123 +++++++++++
 docs/learn/tutorials/versioned/samza-tools.md   | 109 ++++++++++
 docs/startup/download/index.md                  |   7 +
 gradle/dependency-versions.gradle               |   1 +
 .../apache/samza/sql/avro/AvroRelConverter.java |   6 +-
 samza-tools/config/eh-consumer-log4j.xml        |  35 ++++
 .../config/generate-kafka-events-log4j.xml      |  35 ++++
 samza-tools/config/samza-sql-console-log4j.xml  |  35 ++++
 samza-tools/scripts/eh-consumer.sh              |  34 +++
 samza-tools/scripts/generate-kafka-events.sh    |  34 +++
 samza-tools/scripts/samza-sql-console.sh        |  34 +++
 .../apache/samza/tools/CommandLineHelper.java   |  42 ++++
 .../tools/ConsoleLoggingSystemFactory.java      | 126 ++++++++++++
 .../samza/tools/EventHubConsoleConsumer.java    | 120 +++++++++++
 .../apache/samza/tools/GenerateKafkaEvents.java | 205 +++++++++++++++++++
 .../samza/tools/RandomValueGenerator.java       |  87 ++++++++
 .../org/apache/samza/tools/SamzaSqlConsole.java | 188 +++++++++++++++++
 .../tools/avro/AvroSchemaGenRelConverter.java   |  94 +++++++++
 .../avro/AvroSchemaGenRelConverterFactory.java  |  43 ++++
 .../samza/tools/avro/AvroSerDeFactory.java      |  96 +++++++++
 .../tools/json/JsonRelConverterFactory.java     |  93 +++++++++
 .../samza/tools/schemas/PageViewEvent.avsc      |  51 +++++
 .../samza/tools/schemas/PageViewEvent.java      |  60 ++++++
 .../samza/tools/schemas/ProfileChangeEvent.avsc |  51 +++++
 .../samza/tools/schemas/ProfileChangeEvent.java |  60 ++++++
 .../apache/samza/tools/udf/RegexMatchUdf.java   |  40 ++++
 samza-tools/src/main/resources/log4j.xml        |  43 ++++
 settings.gradle                                 |   3 +-
 31 files changed, 1886 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 50cc5e0..860316e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -328,6 +328,34 @@ project(':samza-sql') {
   }
 }
 
+project(':samza-tools') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-sql')
+    compile project(':samza-api')
+    compile project(':samza-azure')
+    compile "log4j:log4j:$log4jVersion"
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "org.slf4j:slf4j-log4j12:$slf4jVersion"
+    compile "commons-cli:commons-cli:$commonsCliVersion"
+    compile "org.apache.avro:avro:$avroVersion"
+    compile "org.apache.commons:commons-lang3:$commonsLang3Version"
+    compile "org.apache.kafka:kafka-clients:$kafkaVersion"
+  }
+
+  tasks.create(name: "releaseToolsTarGz", dependsOn: 
configurations.archives.artifacts, type: Tar) {
+    into "samza-tools-${version}"
+    compression = Compression.GZIP
+    from(project.file("./scripts")) { into "scripts/" }
+    from(project.file("./config")) { into "config/" }
+    from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into 
"scripts/" }
+    from(configurations.runtime) { into("lib/") }
+    from(configurations.archives.artifacts.files) { into("lib/") }
+    duplicatesStrategy 'exclude'
+  }
+}
+
 project(":samza-kafka_$scalaVersion") {
   apply plugin: 'scala'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 21a4991..3de2a78 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -83,8 +83,8 @@ Following can be done when updating the gradle.properties file
 
     * if this is a major release, add the x.x.x release to Archive category in 
docs/_layouts/default.html and x.x.x release part in docs/archive/index.html
 
-    * update the download page to use x.x.x release
-      * docs/startup/download/index.md
+    * update the download page (docs/startup/download/index.md) to use x.x.x 
release
+      * Add an entry to the Sources releases and Samza Tools section to use 
the new x.x.x release
 
     * update the version number in "tar -xvf 
./target/hello-samza-y.y.y-dist.tar.gz -C deploy/samza" in each of the 
tutorials (and search for other uses of version x.x.x which may need to be 
replaced with y.y.y)
       * docs/startup/hello-samza/versioned/index.md

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/learn/tutorials/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/index.md 
b/docs/learn/tutorials/versioned/index.md
index a9ac6a7..d155b82 100644
--- a/docs/learn/tutorials/versioned/index.md
+++ b/docs/learn/tutorials/versioned/index.md
@@ -42,6 +42,9 @@ title: Tutorials
 
 [Samza Async API and Multithreading User Guide](samza-async-user-guide.html)
 
+[Samza SQL User Guide](samza-sql.html)
+
+
 <!-- TODO a bunch of tutorials
 [Log Walkthrough](log-walkthrough.html)
 <a href="configuring-kafka-system.html">Configuring a Kafka System</a><br/>

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/learn/tutorials/versioned/samza-sql.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/samza-sql.md 
b/docs/learn/tutorials/versioned/samza-sql.md
new file mode 100644
index 0000000..fa79a1b
--- /dev/null
+++ b/docs/learn/tutorials/versioned/samza-sql.md
@@ -0,0 +1,123 @@
+---
+layout: page
+title: How to use Samza SQL
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+There are couple of ways to use Samza SQL
+
+1. Run Samza SQL on your local machine.
+2. Run Samza SQL on YARN.
+
+# Running Samza SQL on your local machine  
+
+
+Samza SQL console tool documented [here](samza-tools.html) uses Samza 
standalone to run the Samza SQL on your local machine. This is the quickest way 
to play with Samza SQL. Please follow the instructions [here](samza-tools.html) 
to get access to the Samza tools on your machine.
+
+## Start the Kafka server
+
+Please follow the instructions from the [Kafka 
quickstart](http://kafka.apache.org/quickstart) to start the zookeeper and 
Kafka server.
+
+## Create ProfileChangeStream Kafka topic
+
+The below sql statements requires a topic named ProfileChangeStream to be 
created on the Kafka broker. You can follow the instructions in the [Kafka 
quick start guide](http://kafka.apache.org/quickstart) to create a topic named 
"ProfileChangeStream".
+
+{% highlight bash %}
+./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
1 --partitions 1 --topic ProfileChangeStream
+{% endhighlight %}
+
+## Generate events into ProfileChangeStream topic
+
+Use generate-kafka-events from [Samza tools](samza-tools.html) to generate 
events into the ProfileChangeStream
+
+{% highlight bash %}
+cd samza-tools-<version>
+./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange
+{% endhighlight %}
+
+## Using Samza SQL Console to run Samza sql on your local machine
+
+Below are some of the sql queries that you can execute using the 
samza-sql-console tool from [Samza tools](samza-tools.html) package.
+
+{% highlight bash %}
+# This command just prints out all the events in the Kafka topic 
ProfileChangeStream into console output as a json serialized payload.
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select * 
from kafka.ProfileChangeStream"
+
+# This command prints out the fields that are selected into the console output 
as a json serialized payload.
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select 
Name, OldCompany, NewCompany from kafka.ProfileChangeStream"
+
+# This command showcases the RegexMatch udf and filtering capabilities.
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select 
Name as __key__, Name, NewCompany, RegexMatch('.*soft', OldCompany) from 
kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"
+{% endhighlight %}
+
+
+# Running Samza SQL on YARN
+
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an 
example project designed to help you run your first Samza application. It has 
examples of applications using the low level task API, high level API as well 
as Samza SQL.
+
+This tutorial demonstrates a simple Samza application that uses SQL to perform 
stream processing.
+
+## Get the hello-samza Code and Start the grid
+
+Please follow the instructions from 
[hello-samza-high-level-yarn](hello-samza-high-level-yarn.html) on how to build 
the hello-samza repository and start the yarn grid. 
+
+## Create the topic and generate Kafka events
+
+Please follow the steps in the section "Create ProfileChangeStream Kafka 
topic" and "Generate events into ProfileChangeStream topic" above.
+
+## Build a Samza Application Package
+
+Before you can run a Samza application, you need to build a package for it. 
Please follow the instructions from 
[hello-samza-high-level-yarn](hello-samza-high-level-yarn.html) on how to build 
the hello-samza application package.
+
+## Run a Samza Application
+
+After you've built your Samza package, you can start the app on the grid using 
the run-app.sh script.
+
+{% highlight bash %}
+./deploy/samza/bin/run-app.sh 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=file://$PWD/deploy/samza/config/page-view-filter-sql.properties
+{% endhighlight %}
+
+The app executes the following SQL command :
+{% highlight sql %}
+insert into kafka.NewLinkedInEmployees select Name from ProfileChangeStream 
where NewCompany = 'LinkedIn'
+{% endhighlight %}
+
+This SQL performs the following
+
+1. Consumes the Kafka topic ProfileChangeStreamStream which contains the avro 
serialized ProfileChangeEvent(s) 
+2. Deserializes the events and filters out only the profile change events 
where NewCompany = 'LinkedIn' i.e. Members who have moved to LinkedIn.
+3. Writes the Avro serialized event that contains the Id and Name of those 
profiles to Kafka topic NewLinkedInEmployees.
+
+
+Give the job a minute to startup, and then tail the Kafka topic:
+
+{% highlight bash %}
+./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 
--topic NewLinkedInEmployees
+{% endhighlight %}
+
+
+Congratulations! You've now setup a local grid that includes YARN, Kafka, and 
ZooKeeper, and run a Samza SQL application on it.
+
+## Shutdown and cleanup
+
+To shutdown the app, use the same _run-app.sh_ script with an extra 
_--operation=kill_ argument
+{% highlight bash %}
+./deploy/samza/bin/run-app.sh 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=file://$PWD/deploy/samza/config/page-view-filter-sql.properties 
--operation=kill
+{% endhighlight %}
+
+Please follow the instructions from [Hello Samza High Level API - YARN 
Deployment](hello-samza-high-level-yarn.html) on how to shutdown and cleanup 
the app.

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/learn/tutorials/versioned/samza-tools.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/samza-tools.md 
b/docs/learn/tutorials/versioned/samza-tools.md
new file mode 100644
index 0000000..6e4ee71
--- /dev/null
+++ b/docs/learn/tutorials/versioned/samza-tools.md
@@ -0,0 +1,109 @@
+---
+layout: page
+title: How to use Samza tools
+---
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+# Get Samza tools
+
+Please visit the [Download page] (/startup/download) to download the Samza 
tools package
+
+{% highlight bash %}
+tar -xvzf samza-tools-*.tgz
+cd samza-tools-<version>
+{% endhighlight %}
+
+
+# Using Samza tools
+
+
+## Generate kafka events
+
+
+Generate kafka events tool is used to insert avro serialized events into kafka 
topics. Right now it can insert two types of events 
[PageViewEvent](https://github.com/apache/samza/blob/master/samza-tools/src/main/java/org/com/linkedin/samza/tools/schemas/PageViewEvent.avsc)
 and 
[ProfileChangeEvent](https://github.com/apache/samza/blob/master/samza-tools/src/main/java/org/com/linkedin/samza/tools/schemas/ProfileChangeEvent.avsc)
+
+Before you can generate kafka events, Please follow instructions 
[here](http://kafka.apache.org/quickstart) to start the zookeeper and kafka 
server on your local machine.
+
+You can follow below instructions on how to use Generate kafka events tool.
+
+{% highlight bash %}
+
+# Usage of the tool
+
+./scripts/generate-kafka-events.sh
+usage: Error: Missing required options: t, e
+              generate-kafka-events.sh
+ -b,--broker <BROKER>               Kafka broker endpoint Default 
(localhost:9092).
+ -n,--numEvents <NUM_EVENTS>        Number of events to be produced, 
+                                    Default - Produces events continuously 
every second.
+ -p,--partitions <NUM_PARTITIONS>   Number of partitions in the topic,
+                                    Default (4).
+ -t,--topic <TOPIC_NAME>            Name of the topic to write events to.
+ -e,--eventtype <EVENT_TYPE>        Type of the event values can be 
(PageView|ProfileChange). 
+
+
+# Example command to generate 100 events of type PageViewEvent into topic 
named PageViewStream
+
+ ./scripts/generate-kafka-events.sh -t PageViewStream -e PageView -n 100
+
+
+# Example command to generate ProfileChange events continuously into topic 
named ProfileChangeStream
+
+ ./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange 
+
+{% endhighlight %}
+
+## Samza SQL console tool
+
+Once you generated the events into the kafka topic. Now you can use 
samza-sql-console tool to perform processing on the events published into the 
kafka topic.
+
+There are two ways to use the tool -
+
+1. You can either pass the sql statement directly as an argument to the tool. 
+2. You can write the sql statement(s) into a file and pass the sql file as an 
argument to the tool.
+
+Second option allows you to execute multiple sql statements, whereas the first 
one lets you execute one at a time.
+
+Samza SQL needs all the events in the topic to be uniform schema. And it also 
needs access to the schema corresponding to the events in a topic. Typically in 
an organization, there is a deployment of schema registry which maps topics to 
schemas. 
+
+In the absence of schema registry, Samza SQL console tool uses the convention 
to identify the schemas associated with the topic. If the topic name has string 
"page" it assumes the topic has PageViewEvents else ProfileChangeEvents. 
+
+{% highlight bash %}
+
+# Usage of the tool
+
+ ./scripts/samza-sql-console.sh
+usage: Error: One of the (f or s) options needs to be set
+              samza-sql-console.sh
+ -f,--file <SQL_FILE>   Path to the SQL file to execute.
+ -s,--sql <SQL_STMT>    SQL statement to execute.
+
+# Example command to filter out all the users who have moved to LinkedIn
+
+./scripts/samza-sql-console.sh --sql "Insert into log.consoleOutput select 
Name, OldCompany from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"
+
+{% endhighlight %}
+
+You can run below sql commands using Samza sql console. Please make sure you 
are running generate-kafka-events tool to generate events into 
ProfileChangeStream before running the below command.
+
+{% highlight bash %}
+./scripts/samza-sql-console.sh --sql "Insert into log.consoleOutput select 
Name, OldCompany from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"
+
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/startup/download/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/download/index.md b/docs/startup/download/index.md
index b39f90f..5dcf380 100644
--- a/docs/startup/download/index.md
+++ b/docs/startup/download/index.md
@@ -27,6 +27,12 @@ If you just want to play around with Samza for the first 
time, go to [Hello Samz
 
 Starting from 2016, Samza will begin requiring JDK8 or higher. Please see 
[this mailing list 
thread](http://mail-archives.apache.org/mod_mbox/samza-dev/201610.mbox/%3CCAHUevGGnOQD_VmLWEdpFNq3Lv%2B6gQQmw_JKx9jDr5Cw%2BxFfGtQ%40mail.gmail.com%3E)
 for details on this decision.
 
+### Samza Tools
+ 
+ Samza tools package contains command line tools that user can run to use 
Samza and it's input/output systems. 
+ 
+ * [samza-tools-0.14.0.tgz](tbd)
+
 ### Source Releases
 
  * 
[samza-sources-0.13.1.tgz](http://www.apache.org/dyn/closer.lua/samza/0.13.1)
@@ -40,6 +46,7 @@ Starting from 2016, Samza will begin requiring JDK8 or 
higher. Please see [this
  * 
[samza-sources-0.8.0-incubating.tgz](https://archive.apache.org/dist/incubator/samza/0.8.0-incubating)
  * 
[samza-sources-0.7.0-incubating.tgz](https://archive.apache.org/dist/incubator/samza/0.7.0-incubating)
 
+
 ### Maven
 
 All Samza JARs are published through [Apache's Maven 
repository](https://repository.apache.org/content/groups/public/org/apache/samza/).

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 4f467ab..20a1d56 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -20,6 +20,7 @@
   apacheCommonsCollections4Version = "4.0"
   avroVersion = "1.7.0"
   calciteVersion = "1.14.0"
+  commonsCliVersion = "1.2"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
   commonsHttpClientVersion = "3.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 1c17295..ab46a98 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -167,7 +167,11 @@ public class AvroRelConverter implements SamzaRelConverter 
{
 
   @Override
   public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage) {
-    GenericRecord record = new GenericData.Record(this.avroSchema);
+    return convertToSamzaMessage(relMessage, this.avroSchema);
+  }
+
+  protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage, Schema avroSchema) {
+    GenericRecord record = new GenericData.Record(avroSchema);
     List<String> fieldNames = relMessage.getFieldNames();
     List<Object> values = relMessage.getFieldValues();
     for (int index = 0; index < fieldNames.size(); index++) {

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/config/eh-consumer-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-tools/config/eh-consumer-log4j.xml 
b/samza-tools/config/eh-consumer-log4j.xml
new file mode 100644
index 0000000..d971512
--- /dev/null
+++ b/samza-tools/config/eh-consumer-log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" 
"log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="fileAppender" class="org.apache.log4j.FileAppender">
+    <param name="File"   value="./eh-consumer.log" />
+    <param name="Append" value="false" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p 
[%c{1}:%L] - %m%n"/>
+    </layout>
+  </appender>
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="fileAppender" />
+  </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/config/generate-kafka-events-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-tools/config/generate-kafka-events-log4j.xml 
b/samza-tools/config/generate-kafka-events-log4j.xml
new file mode 100644
index 0000000..98c7a28
--- /dev/null
+++ b/samza-tools/config/generate-kafka-events-log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" 
"log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="fileAppender" class="org.apache.log4j.FileAppender">
+    <param name="File"   value="./generate-kafka-events.log" />
+    <param name="Append" value="false" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p 
[%c{1}:%L] - %m%n"/>
+    </layout>
+  </appender>
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="fileAppender" />
+  </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/config/samza-sql-console-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-tools/config/samza-sql-console-log4j.xml 
b/samza-tools/config/samza-sql-console-log4j.xml
new file mode 100644
index 0000000..6b00381
--- /dev/null
+++ b/samza-tools/config/samza-sql-console-log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" 
"log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="fileAppender" class="org.apache.log4j.FileAppender">
+    <param name="File"   value="./samza-sql-console.log" />
+    <param name="Append" value="false" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p 
[%c{1}:%L] - %m%n"/>
+    </layout>
+  </appender>
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="fileAppender" />
+  </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/scripts/eh-consumer.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/eh-consumer.sh 
b/samza-tools/scripts/eh-consumer.sh
new file mode 100755
index 0000000..363e028
--- /dev/null
+++ b/samza-tools/scripts/eh-consumer.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(realpath $(dirname $0))
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file://$base_dir/../config/eh-consumer-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.EventHubConsoleConsumer "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/scripts/generate-kafka-events.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/generate-kafka-events.sh 
b/samza-tools/scripts/generate-kafka-events.sh
new file mode 100755
index 0000000..858ade6
--- /dev/null
+++ b/samza-tools/scripts/generate-kafka-events.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(realpath $(dirname $0))
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/generate-kafka-events-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.GenerateKafkaEvents "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/scripts/samza-sql-console.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/samza-sql-console.sh 
b/samza-tools/scripts/samza-sql-console.sh
new file mode 100755
index 0000000..39d6930
--- /dev/null
+++ b/samza-tools/scripts/samza-sql-console.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(realpath $(dirname $0))
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file://$base_dir/../config/samza-sql-console-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.SamzaSqlConsole "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/CommandLineHelper.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/CommandLineHelper.java 
b/samza-tools/src/main/java/org/apache/samza/tools/CommandLineHelper.java
new file mode 100644
index 0000000..3cabce3
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/CommandLineHelper.java
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Simple command line helper util.
+ */
+public class CommandLineHelper {
+  public static Option createOption(String shortOpt, String longOpt, String 
argName, boolean required,
+      String description) {
+    OptionBuilder optionBuilder = 
OptionBuilder.withLongOpt(longOpt).withDescription(description).isRequired(required);
+
+    if (!StringUtils.isEmpty(argName)) {
+
+      optionBuilder = optionBuilder.withArgName(argName).hasArg();
+    }
+
+    return optionBuilder.create(shortOpt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
new file mode 100644
index 0000000..87abc76
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Console logging System factory that just writes the messages to the console 
output.
+ * This system factory is useful when the user wants to print the output of 
the stream processing to console.
+ */
+public class ConsoleLoggingSystemFactory implements SystemFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleLoggingSystemFactory.class);
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new LoggingSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SimpleSystemAdmin(config);
+  }
+
+  private class LoggingSystemProducer implements SystemProducer {
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(String source) {
+      LOG.info("Registering source" + source);
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      String msg = String.format("OutputStream:%s Key:%s Value:%s", 
envelope.getSystemStream(), envelope.getKey(),
+          new String((byte[]) envelope.getMessage()));
+      LOG.info(msg);
+
+      if (envelope.getKey() != null) {
+        System.out.println(String.format("Key:%s Value:%s", envelope.getKey(),
+            new String((byte[]) envelope.getMessage())));
+      } else {
+        System.out.println(new String((byte[]) envelope.getMessage()));
+      }
+    }
+
+    @Override
+    public void flush(String source) {
+    }
+  }
+
+  public static class SimpleSystemAdmin implements SystemAdmin {
+
+    public SimpleSystemAdmin(Config config) {
+    }
+
+    @Override
+    public Map<SystemStreamPartition, String> 
getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+      return 
offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+    }
+
+    @Override
+    public Map<String, SystemStreamMetadata> 
getSystemStreamMetadata(Set<String> streamNames) {
+      return streamNames.stream()
+          .collect(Collectors.toMap(Function.identity(), streamName -> new 
SystemStreamMetadata(streamName,
+              Collections.singletonMap(new Partition(0),
+                  new SystemStreamMetadata.SystemStreamPartitionMetadata(null, 
null, null)))));
+    }
+
+    @Override
+    public Integer offsetComparator(String offset1, String offset2) {
+      if (offset1 == null) {
+        return offset2 == null ? 0 : -1;
+      } else if (offset2 == null) {
+        return 1;
+      }
+      return offset1.compareTo(offset2);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java 
b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
new file mode 100644
index 0000000..096e12d
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java
@@ -0,0 +1,120 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+/**
+ * Tool to read events from Microsoft Azure event hubs.
+ */
+public class EventHubConsoleConsumer {
+
+  private static final String OPT_SHORT_EVENTHUB_NAME = "e";
+  private static final String OPT_LONG_EVENTHUB_NAME = "ehname";
+  private static final String OPT_ARG_EVENTHUB_NAME = "EVENTHUB_NAME";
+  private static final String OPT_DESC_EVENTHUB_NAME = "Name of the event 
hub.";
+
+  private static final String OPT_SHORT_NAMESPACE = "n";
+  private static final String OPT_LONG_NAMESPACE = "namespace";
+  private static final String OPT_ARG_NAMESPACE = "EVENTHUB_NAMESPACE";
+  private static final String OPT_DESC_NAMESPACE = "Namespace of the event 
hub.";
+
+  private static final String OPT_SHORT_KEY_NAME = "k";
+  private static final String OPT_LONG_KEY_NAME = "key";
+  private static final String OPT_ARG_KEY_NAME = "KEY_NAME";
+  private static final String OPT_DESC_KEY_NAME = "Name of the key.";
+
+  private static final String OPT_SHORT_TOKEN = "t";
+  private static final String OPT_LONG_TOKEN = "token";
+  private static final String OPT_ARG_TOKEN = "TOKEN";
+  private static final String OPT_DESC_TOKEN = "Token corresponding to the 
key.";
+
+  public static void main(String[] args)
+      throws ServiceBusException, IOException, ExecutionException, 
InterruptedException {
+    Options options = new Options();
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_EVENTHUB_NAME, 
OPT_LONG_EVENTHUB_NAME, OPT_ARG_EVENTHUB_NAME, true,
+        OPT_DESC_EVENTHUB_NAME));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, 
OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true, OPT_DESC_NAMESPACE));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, 
OPT_ARG_KEY_NAME, true, OPT_DESC_KEY_NAME));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_TOKEN, OPT_LONG_TOKEN, 
OPT_ARG_TOKEN, true, OPT_DESC_TOKEN));
+
+    CommandLineParser parser = new BasicParser();
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(options, args);
+    } catch (Exception e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(String.format("Error: %s%neh-console-consumer.sh", 
e.getMessage()), options);
+      return;
+    }
+
+    String ehName = cmd.getOptionValue(OPT_SHORT_EVENTHUB_NAME);
+    String namespace = cmd.getOptionValue(OPT_SHORT_NAMESPACE);
+    String keyName = cmd.getOptionValue(OPT_SHORT_KEY_NAME);
+    String token = cmd.getOptionValue(OPT_SHORT_TOKEN);
+
+    consumeEvents(ehName, namespace, keyName, token);
+  }
+
+  private static void consumeEvents(String ehName, String namespace, String 
keyName, String token)
+      throws ServiceBusException, IOException, ExecutionException, 
InterruptedException {
+    ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespace, 
ehName, keyName, token);
+
+    EventHubClient client = 
EventHubClient.createFromConnectionStringSync(connStr.toString());
+
+    EventHubRuntimeInformation runTimeInfo = 
client.getRuntimeInformation().get();
+    int numPartitions = runTimeInfo.getPartitionCount();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      PartitionReceiver receiver =
+          
client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, 
String.valueOf(partition),
+              PartitionReceiver.START_OF_STREAM);
+      receiver.receive(10).handle((records, throwable) -> 
handleComplete(receiver, records, throwable));
+    }
+  }
+
+  private static Object handleComplete(PartitionReceiver receiver, 
Iterable<EventData> records, Throwable throwable) {
+    for (EventData record : records) {
+      System.out.println(
+          String.format("Partition %s, Event %s", receiver.getPartitionId(), 
new String(record.getBytes())));
+    }
+
+    receiver.receive(10).handle((r, t) -> handleComplete(receiver, r, t));
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java 
b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
new file mode 100644
index 0000000..6c30eee
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java
@@ -0,0 +1,205 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import org.apache.samza.tools.schemas.PageViewEvent;
+import org.apache.samza.tools.schemas.ProfileChangeEvent;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenerateKafkaEvents {
+
+  private static final String OPT_SHORT_TOPIC_NAME = "t";
+  private static final String OPT_LONG_TOPIC_NAME = "topic";
+  private static final String OPT_ARG_TOPIC_NAME = "TOPIC_NAME";
+  private static final String OPT_DESC_TOPIC_NAME = "Name of the topic to 
write events to.";
+
+  private static final String OPT_SHORT_BROKER = "b";
+  private static final String OPT_LONG_BROKER = "broker";
+  private static final String OPT_ARG_BROKER = "BROKER";
+  private static final String OPT_DESC_BROKER = "Kafka broker endpoint.";
+  private static final String DEFAULT_BROKER = "localhost:9092";
+
+  private static final String OPT_SHORT_NUM_EVENTS = "n";
+  private static final String OPT_LONG_NUM_EVENTS = "numEvents";
+  private static final String OPT_ARG_NUM_EVENTS = "NUM_EVENTS";
+  private static final String OPT_DESC_NUM_EVENTS = "Number of events to be 
produced.";
+
+  private static final String OPT_SHORT_EVENT_TYPE = "e";
+  private static final String OPT_LONG_EVENT_TYPE = "eventtype";
+  private static final String OPT_ARG_EVENT_TYPE = "EVENT_TYPE";
+  private static final String OPT_DESC_EVENT_TYPE =
+      "Type of the event (PageView|ProfileChange) Default(ProfileChange).";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GenerateKafkaEvents.class);
+  private static RandomValueGenerator randValueGenerator;
+
+  private static String[] companies =
+      new String[]{"Microsoft", "LinkedIn", "Google", "Facebook", "Amazon", 
"Apple", "Twitter", "Snap"};
+
+  private static final String PAGEVIEW_EVENTTYPE = "pageview";
+
+  public static void main(String[] args) throws UnsupportedEncodingException, 
InterruptedException {
+    randValueGenerator = new RandomValueGenerator(System.currentTimeMillis());
+    Options options = new Options();
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_TOPIC_NAME, 
OPT_LONG_TOPIC_NAME, OPT_ARG_TOPIC_NAME, true, OPT_DESC_TOPIC_NAME));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_BROKER, OPT_LONG_BROKER, 
OPT_ARG_BROKER, false, OPT_DESC_BROKER));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_NUM_EVENTS, 
OPT_LONG_NUM_EVENTS, OPT_ARG_NUM_EVENTS, false, OPT_DESC_NUM_EVENTS));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_EVENT_TYPE, 
OPT_LONG_EVENT_TYPE, OPT_ARG_EVENT_TYPE, false, OPT_DESC_EVENT_TYPE));
+
+    CommandLineParser parser = new BasicParser();
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(options, args);
+    } catch (Exception e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(String.format("Error: %s%ngenerate-events.sh", 
e.getMessage()), options);
+      return;
+    }
+
+    String topicName = cmd.getOptionValue(OPT_SHORT_TOPIC_NAME);
+    String broker = cmd.getOptionValue(OPT_SHORT_BROKER, DEFAULT_BROKER);
+    long numEvents = Long.parseLong(cmd.getOptionValue(OPT_SHORT_NUM_EVENTS, 
String.valueOf(Long.MAX_VALUE)));
+    String eventType = cmd.getOptionValue(OPT_SHORT_EVENT_TYPE);
+    generateEvents(broker, topicName, eventType, numEvents);
+  }
+
+  private static void generateEvents(String brokers, String topicName, String 
eventType, long numEvents)
+      throws UnsupportedEncodingException, InterruptedException {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", brokers);
+    props.put("retries", 100);
+    props.put("batch.size", 16384);
+    props.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
+    props.put("value.serializer", 
ByteArraySerializer.class.getCanonicalName());
+
+    Function<Integer, Pair<String, byte[]>> eventGenerator;
+    if (eventType.toLowerCase().contains(PAGEVIEW_EVENTTYPE)) {
+      eventGenerator = GenerateKafkaEvents::generatePageViewEvent;
+    } else {
+      eventGenerator = GenerateKafkaEvents::generateProfileChangeEvent;
+    }
+
+    boolean doSleep = false;
+    // sleep only when the events have to be produced continuously.
+    if (numEvents == Long.MAX_VALUE) {
+      doSleep = true;
+    }
+
+    try (Producer<byte[], byte[]> producer = new KafkaProducer<>(props)) {
+      for (int index = 0; index < numEvents; index++) {
+        final int finalIndex = 0;
+        Pair<String, byte[]> record = eventGenerator.apply(index);
+        producer.send(new ProducerRecord<>(topicName, 
record.getLeft().getBytes("UTF-8"), record.getRight()),
+            (metadata, exception) -> {
+              if (exception == null) {
+                LOG.info("send completed for event {} at offset {}", 
finalIndex, metadata.offset());
+              } else {
+                throw new RuntimeException("Failed to send message.", 
exception);
+              }
+            });
+        System.out.println(String.format("Published event %d to topic %s", 
index, topicName));
+        if (doSleep) {
+          Thread.sleep(1000);
+        }
+      }
+
+      producer.flush();
+    }
+  }
+
+  private static Pair<String, byte[]> generateProfileChangeEvent(Integer 
index) {
+    ProfileChangeEvent event = new ProfileChangeEvent();
+    String name = randValueGenerator.getNextString(10, 20);
+    event.Name = name;
+    event.NewCompany = companies[randValueGenerator.getNextInt(0, 
companies.length - 1)];
+    event.OldCompany = companies[randValueGenerator.getNextInt(0, 
companies.length - 1)];
+    event.ProfileChangeTimestamp = System.currentTimeMillis();
+    byte[] value;
+    try {
+      value = encodeAvroSpecificRecord(ProfileChangeEvent.class, event);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return new ImmutablePair<>(name, value);
+  }
+
+  /**
+   * Encode an Avro record into byte array
+   *
+   * @param clazz The class type of the Avro record
+   * @param record the instance of the avro record
+   * @param <T> The type of the avro record.
+   * @return encoded bytes
+   * @throws java.io.IOException
+   */
+  public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) 
throws IOException {
+    DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  private static Pair<String, byte[]> generatePageViewEvent(int index) {
+    PageViewEvent event = new PageViewEvent();
+    String name = randValueGenerator.getNextString(10, 20);
+    event.id = randValueGenerator.getNextInt();
+    event.Name = name;
+    event.ViewerName = randValueGenerator.getNextString(10, 20);
+    event.ProfileViewTimestamp = System.currentTimeMillis();
+    byte[] value;
+    try {
+      value = encodeAvroSpecificRecord(PageViewEvent.class, event);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new ImmutablePair<>(name, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/RandomValueGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/RandomValueGenerator.java 
b/samza-tools/src/main/java/org/apache/samza/tools/RandomValueGenerator.java
new file mode 100644
index 0000000..18e0316
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/RandomValueGenerator.java
@@ -0,0 +1,87 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import java.util.Random;
+
+
+/**
+ * Simple utility to generate random values.
+ */
+public class RandomValueGenerator {
+
+  private String validChars = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+  private Random rand;
+
+  // to help reproducibility of failed tests, seed is always required
+  public RandomValueGenerator(long seed) {
+    rand = new Random(seed);
+  }
+
+  public int getNextInt() {
+    return rand.nextInt();
+  }
+
+  // to make it inclusive of min and max for the range, add 1 to the difference
+  public int getNextInt(int min, int max) {
+    if (max == min) {
+      return min;
+    }
+    // assert(max > min);
+
+    return (rand.nextInt(max - min + 1) + min);
+  }
+
+  public String getNextString(int min, int max) {
+    int length = getNextInt(min, max);
+
+    StringBuilder strbld = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      char ch = validChars.charAt(rand.nextInt(validChars.length()));
+      strbld.append(ch);
+    }
+
+    return strbld.toString();
+  }
+
+  public double getNextDouble() {
+    return rand.nextDouble();
+  }
+
+  public float getNextFloat() {
+    return rand.nextFloat();
+  }
+
+  public long getNextLong() {
+    long randomLong = rand.nextLong();
+
+    return randomLong == Long.MIN_VALUE ? 0 : Math.abs(randomLong);
+  }
+
+  public boolean getNextBoolean() {
+    return rand.nextBoolean();
+  }
+
+  public byte[] getNextBytes(int maxBytesLength) {
+    byte[] bytes = new byte[this.getNextInt(0, maxBytesLength)];
+    rand.nextBytes(bytes);
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java 
b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
new file mode 100644
index 0000000..9803117
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -0,0 +1,188 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools;
+
+import com.google.common.base.Joiner;
+import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
+import org.apache.samza.tools.avro.AvroSerDeFactory;
+import org.apache.samza.tools.json.JsonRelConverterFactory;
+import org.apache.samza.tools.schemas.PageViewEvent;
+import org.apache.samza.tools.schemas.ProfileChangeEvent;
+import org.apache.samza.tools.udf.RegexMatchUdf;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+
+
+public class SamzaSqlConsole {
+
+  private static final String OPT_SHORT_SQL_FILE = "f";
+  private static final String OPT_LONG_SQL_FILE = "file";
+  private static final String OPT_ARG_SQL_FILE = "SQL_FILE";
+  private static final String OPT_DESC_SQL_FILE = "Path to the SQL file to 
execute.";
+
+  private static final String OPT_SHORT_SQL_STMT = "s";
+  private static final String OPT_LONG_SQL_STMT = "sql";
+  private static final String OPT_ARG_SQL_STMT = "SQL_STMT";
+  private static final String OPT_DESC_SQL_STMT = "SQL statement to execute.";
+
+  private static final String SAMZA_SYSTEM_KAFKA = "kafka";
+  private static final String SAMZA_SYSTEM_LOG = "log";
+
+  public static void main(String[] args) {
+    Options options = new Options();
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_SQL_FILE, OPT_LONG_SQL_FILE, 
OPT_ARG_SQL_FILE, false, OPT_DESC_SQL_FILE));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_SQL_STMT, OPT_LONG_SQL_STMT, 
OPT_ARG_SQL_STMT, false, OPT_DESC_SQL_STMT));
+
+    CommandLineParser parser = new BasicParser();
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(options, args);
+      if (!cmd.hasOption(OPT_SHORT_SQL_STMT) && 
!cmd.hasOption(OPT_SHORT_SQL_FILE)) {
+        throw new Exception(
+            String.format("One of the (%s or %s) options needs to be set", 
OPT_SHORT_SQL_FILE, OPT_SHORT_SQL_STMT));
+      }
+    } catch (Exception e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(String.format("Error: %s%nsamza-sql-console.sh", 
e.getMessage()), options);
+      return;
+    }
+
+    List<String> sqlStmts;
+
+    if (cmd.hasOption(OPT_SHORT_SQL_FILE)) {
+      String sqlFile = cmd.getOptionValue(OPT_SHORT_SQL_FILE);
+      sqlStmts = SqlFileParser.parseSqlFile(sqlFile);
+    } else {
+      String sql = cmd.getOptionValue(OPT_SHORT_SQL_STMT);
+      System.out.println("Executing sql " + sql);
+      sqlStmts = Collections.singletonList(sql);
+    }
+
+    executeSql(sqlStmts);
+  }
+
+  public static void executeSql(List<String> sqlStmts) {
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig();
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+  }
+
+  public static Map<String, String> fetchSamzaSqlConfig() {
+    HashMap<String, String> staticConfigs = new HashMap<>();
+
+    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
+    String configSourceResolverDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
+    staticConfigs.put(configSourceResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedSourceResolverFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+    String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configUdfResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedUdfResolver.class.getName());
+    staticConfigs.put(configUdfResolverDomain + 
ConfigBasedUdfResolver.CFG_UDF_CLASSES,
+        Joiner.on(",").join(RegexMatchUdf.class.getName(), 
FlattenUdf.class.getName()));
+
+    staticConfigs.put("serializers.registry.string.class", 
StringSerdeFactory.class.getName());
+    staticConfigs.put("serializers.registry.avro.class", 
AvroSerDeFactory.class.getName());
+    staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, 
ProfileChangeEvent.SCHEMA$.toString());
+
+    String kafkaSystemConfigPrefix =
+        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_KAFKA);
+    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_KAFKA);
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", 
KafkaSystemFactory.class.getName());
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
+    staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", 
"localhost:2181");
+    staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", 
"localhost:9092");
+
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", 
"oldest");
+
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String logSystemConfigPrefix =
+        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_LOG);
+    String logSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_LOG);
+    staticConfigs.put(logSystemConfigPrefix + "samza.factory", 
ConsoleLoggingSystemFactory.class.getName());
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "json");
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avroSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
+
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroSchemaGenRelConverterFactory.class.getName());
+
+    String jsonSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"json");
+
+    staticConfigs.put(jsonSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        JsonRelConverterFactory.class.getName());
+
+    String configAvroRelSchemaProviderDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, 
"config");
+    staticConfigs.put(configAvroRelSchemaProviderDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedAvroRelSchemaProviderFactory.class.getName());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + 
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            "kafka", "PageViewStream"), PageViewEvent.SCHEMA$.toString());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + 
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            "kafka", "ProfileChangeStream"), 
ProfileChangeEvent.SCHEMA$.toString());
+
+    return staticConfigs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
new file mode 100644
index 0000000..198b84b
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools.avro;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Special form for AvroRelConverter that generates the avro schema on the 
output based on the
+ * fields in {@link SamzaSqlRelMessage} and uses the schema to serialize the 
output.
+ * This is useful to test out the SQL quickly when the destination system 
supports Avro serialized data,
+ * without having to manually author the avro schemas for various SQL queries.
+ */
+public class AvroSchemaGenRelConverter extends AvroRelConverter {
+
+  private final String streamName;
+  private Map<String, Schema> schemas = new HashMap<>();
+
+  public AvroSchemaGenRelConverter(SystemStream systemStream, 
AvroRelSchemaProvider schemaProvider, Config config) {
+    super(systemStream, schemaProvider, config);
+    streamName = systemStream.getStream();
+  }
+
+  @Override
+  public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage) {
+    Schema schema = computeSchema(streamName, relMessage);
+    return convertToSamzaMessage(relMessage, schema);
+  }
+
+  private Schema computeSchema(String streamName, SamzaSqlRelMessage 
relMessage) {
+    List<Schema.Field> keyFields = new ArrayList<>();
+    List<String> fieldNames = relMessage.getFieldNames();
+    List<Object> values = relMessage.getFieldValues();
+
+    for (int index = 0; index < fieldNames.size(); index++) {
+      if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || 
values.get(index) == null) {
+        continue;
+      }
+
+      Object value = values.get(index);
+      Schema avroType;
+      if (value instanceof GenericData.Record) {
+        avroType = ((GenericData.Record) value).getSchema();
+      } else {
+        avroType = ReflectData.get().getSchema(value.getClass());
+      }
+      keyFields.add(new Schema.Field(fieldNames.get(index), avroType, "", 
null));
+    }
+
+    Schema ks = Schema.createRecord(streamName, "", streamName + "_namespace", 
false);
+    ks.setFields(keyFields);
+    String schemaStr = ks.toString();
+    Schema schema;
+    // See whether we have a schema object corresponding to the schemaValue 
and reuse it.
+    // CachedSchemaRegistryClient doesn't like if we recreate schema objects.
+    if (schemas.containsKey(schemaStr)) {
+      schema = schemas.get(schemaStr);
+    } else {
+      schema = Schema.parse(schemaStr);
+      schemas.put(schemaStr, schema);
+    }
+
+    return schema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverterFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverterFactory.java
new file mode 100644
index 0000000..cf8c568
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverterFactory.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools.avro;
+
+import java.util.HashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory for {@link AvroSchemaGenRelConverter}
+ */
+public class AvroSchemaGenRelConverterFactory implements 
SamzaRelConverterFactory {
+
+  private final HashMap<SystemStream, SamzaRelConverter> relConverters = new 
HashMap<>();
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider 
relSchemaProvider, Config config) {
+    return relConverters.computeIfAbsent(systemStream,
+        ss -> new AvroSchemaGenRelConverter(ss, (AvroRelSchemaProvider) 
relSchemaProvider, config));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..a052306
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+/**
+ * Avro SerDe that can be used to serialize or deserialize the avro {@link 
GenericRecord}.
+ */
+public class AvroSerDeFactory implements SerdeFactory {
+
+  public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+  @Override
+  public Serde getSerde(String name, Config config) {
+    return new AvroSerDe(config);
+  }
+
+  private class AvroSerDe implements Serde {
+    private final Schema schema;
+
+    public AvroSerDe(Config config) {
+      schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+    }
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+      GenericRecord record;
+      try {
+        record = genericRecordFromBytes(bytes, schema);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to deserialize the record", e);
+      }
+      return record;
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+      GenericRecord record = (GenericRecord) o;
+      try {
+        return encodeAvroGenericRecord(schema, record);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to serialize the record", e);
+      }
+    }
+  }
+
+  private byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) 
throws IOException {
+    DatumWriter<IndexedRecord> msgDatumWriter = new 
GenericDatumWriter<>(schema);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) 
throws IOException {
+    BinaryDecoder binDecoder = 
DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+    GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+    return reader.read(null, binDecoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
new file mode 100644
index 0000000..de8dec8
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.tools.json;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
+
+
+/**
+ * SamzaRelConverter that can convert {@link SamzaSqlRelMessage} to json 
string byte array.
+ */
+public class JsonRelConverterFactory implements SamzaRelConverterFactory {
+
+  ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider 
relSchemaProvider, Config config) {
+    return new JsonRelConverter();
+  }
+
+  public class JsonRelConverter implements SamzaRelConverter {
+
+    @Override
+    public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> kv) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage) {
+
+      String jsonValue;
+      ObjectNode node = mapper.createObjectNode();
+
+      List<String> fieldNames = relMessage.getFieldNames();
+      List<Object> values = relMessage.getFieldValues();
+
+      for (int index = 0; index < fieldNames.size(); index++) {
+        Object value = values.get(index);
+        if (value == null) {
+          continue;
+        }
+
+        // TODO limited support right now.
+        if (Long.class.isAssignableFrom(value.getClass())) {
+          node.put(fieldNames.get(index), (Long) value);
+        } else if (Integer.class.isAssignableFrom(value.getClass())) {
+          node.put(fieldNames.get(index), (Integer) value);
+        } else if (Double.class.isAssignableFrom(value.getClass())) {
+          node.put(fieldNames.get(index), (Double) value);
+        } else if (String.class.isAssignableFrom(value.getClass())) {
+          node.put(fieldNames.get(index), (String) value);
+        } else {
+          node.put(fieldNames.get(index), value.toString());
+        }
+      }
+      try {
+        jsonValue = mapper.writeValueAsString(node);
+      } catch (IOException e) {
+        throw new SamzaException("Error json serializing object", e);
+      }
+
+      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.avsc
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.avsc 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.avsc
new file mode 100644
index 0000000..dea6a12
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.avsc
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "PageViewEvent",
+    "version" : 1,
+    "namespace": "com.linkedin.samza.tools.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Record id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "Name",
+            "doc": "Name of the profile.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "ViewerName",
+            "doc": "Name of the person who viewed the profile.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "ProfileViewTimestamp",
+            "doc": "Time at which the profile was viewed.",
+            "type": ["null", "long"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.java 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.java
new file mode 100644
index 0000000..7cfbcb9
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/PageViewEvent.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.tools.schemas;
+
+@SuppressWarnings("all")
+public class PageViewEvent extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PageViewEvent\",\"namespace\":\"com.linkedin.samza.tools.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"doc\":\"Name
 of the 
profile.\",\"default\":null},{\"name\":\"ViewerName\",\"type\":[\"null\",\"string\"],\"doc\":\"Name
 of the person who viewed the 
profile.\",\"default\":null},{\"name\":\"ProfileViewTimestamp\",\"type\":[\"null\",\"long\"],\"doc\":\"Time
 at which the profile was viewed.\",\"default\":null}]}");
+  /** Record id. */
+  public java.lang.Integer id;
+  /** Name of the profile. */
+  public java.lang.CharSequence Name;
+  /** Name of the person who viewed the profile. */
+  public java.lang.CharSequence ViewerName;
+  /** Time at which the profile was viewed. */
+  public java.lang.Long ProfileViewTimestamp;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return Name;
+    case 2: return ViewerName;
+    case 3: return ProfileViewTimestamp;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: Name = (java.lang.CharSequence)value$; break;
+    case 2: ViewerName = (java.lang.CharSequence)value$; break;
+    case 3: ProfileViewTimestamp = (java.lang.Long)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

Reply via email to