STORM-2136 Write some examples to show the usage of Storm SQL

* This should be merged after STORM-2135 (hence after STORM-2125, too)
* NOTE: explain will be covered once explain feature is merged to master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5884d55f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5884d55f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5884d55f

Branch: refs/heads/master
Commit: 5884d55fd3f966a93b4c6b254b6bc698c9d8cef6
Parents: 43478ed
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Sun Oct 9 11:22:28 2016 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Tue Oct 11 11:38:57 2016 +0900

----------------------------------------------------------------------
 docs/storm-sql-example.md | 372 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 372 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5884d55f/docs/storm-sql-example.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql-example.md b/docs/storm-sql-example.md
new file mode 100644
index 0000000..544d1fe
--- /dev/null
+++ b/docs/storm-sql-example.md
@@ -0,0 +1,372 @@
+---
+title: Storm SQL example
+layout: documentation
+documentation: true
+---
+
+This page shows how to use Storm SQL by showing the example of processing 
Apache logs. 
+This page is written by "how-to" style so you can follow the step and learn 
how to utilize Storm SQL step by step. 
+
+## Preparation
+
+This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka is 
installed locally and running with properly configured.
+For convenience, this page assumes that Apache Kafka 0.10.0 is installed via 
`brew`.
+
+We'll use below tools to prepare the JSON data which will be fed to the input 
data source. 
+Since they're Python projects, this page assumes Python 2.7 with `pip`, 
`virtualenv` is installed locally. 
+If you're using Python 3, you may need to convert some places to be compatible 
with 3 manually while feeding data.  
+
+* https://github.com/kiritbasu/Fake-Apache-Log-Generator
+* https://github.com/rory/apache-log-parser
+
+## Creating Topics
+
+In this page, we will use four topics, `apache-logs`, `apache-errorlogs`, 
`apache-slowlogs`, `apache-slowlogs-aggregated`.
+Please create topics according to your environment. 
+
+For Apache Kafka 0.10.0 with brew installed,
+
+```
+kafka-topics --create --topic apache-logs --zookeeper localhost:2181 
--replication-factor 1 --partitions 5
+kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 
--replication-factor 1 --partitions 5
+kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 
--replication-factor 1 --partitions 5
+kafka-topics --create --topic apache-slowlogs-aggregated --zookeeper 
localhost:2181 --replication-factor 1 --partitions 5
+```
+
+## Feeding Data
+
+Let's feed the data to input topics. In this page we will generate fake Apache 
logs, and parse to JSON format, and feed JSON to Kafka topic. 
+
+Let's create your working directory, since we will clone the project and also 
setup virtualenv.
+
+In your working directory, `virtualenv env` to setup virtualenv to env 
directory, and activate.
+
+```
+$ virtualenv env
+$ source env/bin/activate
+```
+
+Feel free to `deactivate` when you're done with example.
+ 
+### Install and modify Fake-Apache-Log-Generator
+
+`Fake-Apache-Log-Generator` is not presented to package, and also we need to 
modify the script.
+
+```
+$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
+$ cd Fake-Apache-Log-Generator
+```
+
+Open `apache-fake-log-gen.py` and replace `while (flag):` statements to below:
+
+```
+        elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
+        seconds=random.randint(30,300)
+        increment = datetime.timedelta(seconds=seconds)
+        otime += increment
+
+        ip = faker.ipv4()
+        dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
+        tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
+        vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])
+
+        uri = random.choice(resources)
+        if uri.find("apps")>0:
+                uri += `random.randint(1000,10000)`
+
+        resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
+        byt = int(random.gauss(5000,50))
+        referer = faker.uri()
+        useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
+        f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % 
(ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))
+
+        log_lines = log_lines - 1
+        flag = False if log_lines == 0 else True
+```
+
+to make sure fake elapsed_us is included to fake log.
+
+For convenience, you can skip cloning project and download modified file from 
here: [apache-fake-log-gen.py 
(gist)](https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2)
+
+### Install apache-log-parser and write parsing script
+
+`apache-log-parser` can be installed via `pip`.
+
+```
+$ pip install apache-log-parser
+```
+
+Since apache-log-parser is a library, in order to parse fake log we need to 
write small python script.
+Let's create file `parse-fake-log-gen-to-json-with-incrementing-id.py` with 
below content: 
+
+```
+import sys
+import apache_log_parser
+import json
+
+auto_incr_id = 1
+parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
+line_parser = apache_log_parser.make_parser(parser_format)
+while True:
+  # we'll use pipe
+  line = sys.stdin.readline()
+  if not line:
+    break
+  parsed_dict = line_parser(line)
+  parsed_dict['id'] = auto_incr_id
+  auto_incr_id += 1
+
+  # works only python 2, but I don't care cause it's just a test module :)
+  parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not 
k.endswith('datetimeobj')}
+  print json.dumps(parsed_dict)
+```
+
+### Feed parsed JSON Apache Log to Kafka
+
+OK! We're prepared to feed the data to Kafka topic. Let's use 
`kafka-console-producer` to feed parsed JSON.
+
+```
+$ python apache-fake-log-gen.py -n 0 | python 
parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer 
--broker-list localhost:9092 --topic apache-logs
+```
+
+and execute below to another terminal session to confirm data is being fed.
+
+```
+$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs
+```
+
+If you can see the json like below, it's done:
+
+```
+{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", 
"REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", 
"TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": 
"2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", 
"REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", 
"REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, 
"REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", 
"REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; 
Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", 
"REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", 
"REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": 
"2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", 
"REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": 
"", "REQUEST_URL_USERNAME": null, 
"REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": 
null, "REQUEST_HEADER_USER_
 AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, 
"REQUEST_HEADER_REFERER": "http://white.com/terms/";, "REQUEST_URL_PORT": null, 
"REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", 
"REMOTE_IP": "88.203.90.62"}
+```
+
+## Example: filtering error logs
+ 
+In this example we'll filter error logs from entire logs and store them to 
another topics. `project` and `filter` features will be used.
+
+The content of script file is here:
+
+```
+CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, 
REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, 
REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US 
DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs'
+CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP 
VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, 
REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, 
TIME_ELAPSED_MS INT) LOCATION 
'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, 
REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, 
TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM 
APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4
+```
+
+Save this file to `apache_log_error_filtering.sql`.
+
+Let's take a look at the script.
+
+The first statement defines the table `APACHE_LOGS` which represents the input 
stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path 
of the brokers in ZooKeeper (`/brokers`) and the topic (`apache-logs`).
+Note that Kafka data source requires primary key to be defined. That's why we 
put integer id for parsed JSON data.
+
+Similarly, the second statement specifies the table `APACHE_ERROR_LOGS` which 
represents the output stream. The `TBLPROPERTIES` clause specifies the 
configuration of 
[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs) and 
is required for a Kafka sink table.
+ 
+The last statement defines the topology. Storm SQL only define the topology 
and run topology on DML statement. 
+DDL statements define input data source, output data source, and user defined 
function, and also user defined aggregate function which will be referred by 
DML statement.
+
+Let's look at where statement first. Since we want to filter error logs, we 
divide status by 100 and compare quotient is equal or greater than 4. (easier 
representation is `>= 400`)
+Since status in JSON is string format (hence represented as VARCHAR for 
APACHE_LOGS table), we apply CAST(STATUS AS INT) to convert to integer type 
before applying division.
+Now we have filtered only error logs. 
+
+Let's transform some columns to match the output stream. In this statement we 
apply CAST(STATUS AS INT) to convert to integer type, and divide TIME_US by 
1000 to convert microsecond to millisecond.
+
+Last, insert statement stores filtered and transformed rows (tuples) to the 
output stream.  
+
+To run this example, users need to include the data sources (`storm-sql-kafka` 
in this case) and its dependency in the
+class path. Dependencies for Storm SQL are automatically handled when users 
run `storm sql`. 
+Users can include data sources at the submission step like below:
+
+```
+$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql 
apache_log_error_filtering --artifacts 
"org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+Above command submits the SQL statements to StormSQL. The option of storm sql 
is `storm sql [script file] [topology name]`. 
+Users need to modify each artifacts' version if users are using different 
version of Storm or Kafka.
+ 
+If your statements pass the validation phase, topology will be shown to Storm 
UI page.
+
+You can see the output via console:
+
+```
+$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs
+```
+
+and the output will be similar to:
+
+```
+{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0
 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 
Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222}
+{"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0
 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 
Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851}
+...
+```
+
+You can also run Storm SQL runner to see the logical plan via placing 
`--explain` to topology name:
+
+```
+$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain 
--artifacts 
"org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+and the output will be similar to:
+
+```
+LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], 
updateColumnList=[[]], flattened=[true]), id = 8
+  LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], 
REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], 
REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], 
TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
+    LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
+      EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5
+```
+
+It might be not same as you are seeing if Storm SQL applies query 
optimizations.
+
+We're executing the first Storm SQL topology! Please kill the topology when 
you see enough output and the logs.
+
+To be concise, we'll skip explaining the things we've already seen.
+
+## Example: filtering slow logs
+
+In this example we'll filter slow logs from entire logs and store them to 
another topics. `project` and `filter`, and `User Defined Function (UDF)` 
features will be used.
+This is very similar to `filtering error logs` but we'll see how to define 
`User Defined Function (UDF)`.
+
+The content of script file is here:
+
+```
+CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, 
REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, 
REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US 
DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' 
TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, 
REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, 
REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, 
TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 
'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE FUNCTION GET_TIME AS 
'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
+INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, 
REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, 
TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 
'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS 
TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100
+```
+
+Save this file to `apache_log_slow_filtering.sql`.
+
+We can skip the first 2 statements since it's almost same to the last example.
+
+The third statement defines the `User defined function`. We're defining 
`GET_TIME` which uses 
`org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2` class.
+
+The implementation of GetTime2 is here:
+
+```
+package org.apache.storm.sql.runtime.functions.scalar.datetime;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+public class GetTime2 {
+    public static Long evaluate(String dateString, String dateFormat) {
+        try {
+            DateTimeFormatter df = 
DateTimeFormat.forPattern(dateFormat).withZoneUTC();
+            return df.parseDateTime(dateString).getMillis();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+}
+```
+
+This class can be used for UDF since it defines static `evaluate` method. The 
SQL type of parameters and return are determined by Calcite which Storm SQL 
depends on. 
+
+Note that this class should be in classpath, so in order to define UDF, you 
need to create jar file which contains UDF classes and run `storm sql` with 
`--jar` option.
+This page assumes that GetTime2 is in classpath, for simplicity.
+ 
+The last statement is very similar to filtering error logs. The only new thing 
is that we call `GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 
'yyyy-MM-dd''T''HH:mm:ssZZ')` to convert string time to unix timestamp (BIGINT).
+
+Let's execute it.
+
+```
+$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql 
apache_log_slow_filtering --artifacts 
"org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+You can see the output via console:
+
+```
+$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs
+```
+
+and the output will be similar to:
+
+```
+{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0
 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 
Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579}
+{"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0
 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 
Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957}
+...
+```
+
+That's it! Supposing we have UDF which queries geo location via remote ip, we 
can filter via geo location, or enrich geo location to transformed result.
+
+## Example: slow logs aggregation
+
+In this example we'll aggregate logs from entire logs by request url and 
method, and filter via aggregated value, and store them to another topics. 
+`project` and `filter`, `aggregation` and `User Defined Aggregate Function 
(UDAF)` features will be used.
+
+The content of script file is here:
+
+```
+CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, 
REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, 
REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, time_us 
DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs' 
TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (DUMMY_ID INT PRIMARY KEY, REQUEST_URL 
VARCHAR, REQUEST_METHOD VARCHAR, CNT INT, TIME_ELAPSED_MS_MIN INT, 
TIME_ELAPSED_MS_MAX INT, TIME_ELAPSED_MS_AVG INT, TIME_ELAPSED_MS_PERCENTILE_99 
BIGINT) LOCATION 
'kafka://localhost:2181/brokers?topic=apache-slowlogs-aggregated' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE FUNCTION PERCENTILE_99 AS 
'org.apache.storm.sql.runtime.functions.aggregate.Percentile99'
+INSERT INTO APACHE_SLOW_LOGS SELECT MIN(ID), REQUEST_URL, REQUEST_METHOD, 
COUNT(*) AS CNT, MIN(TIME_US) / 1000 AS TIME_ELAPSED_MS_MIN, MAX(TIME_US) / 
1000 AS TIME_ELAPSED_MS_MAX, AVG(TIME_US) / 1000 AS TIME_ELAPSED_MS_AVG, 
PERCENTILE_99(CAST(TIME_US AS BIGINT)) / 1000.0 AS 
TIME_ELAPSED_MS_PERCENTILE_99 FROM APACHE_LOGS GROUP BY REQUEST_URL, 
REQUEST_METHOD HAVING AVG(TIME_US) / 1000 >= 300
+```
+
+Save this file to `apache_log_slow_aggregation.sql`.
+
+In this time we'll only see the UDAF definition since we covered all of 
others. Definition of UDAF is same as UDF, except its class implementation.
+
+```
+package org.apache.storm.sql.runtime.functions.aggregate;
+
+import org.HdrHistogram.Histogram;
+
+public class Percentile99 {
+    public static Histogram init() {
+        return new Histogram(3600000000000L, 3);
+    }
+
+    public static Histogram add(Histogram accumulator, Long v) {
+        if (v != null) {
+            accumulator.recordValue(v);
+        }
+        return accumulator;
+    }
+
+    public static long result(Histogram accumulator) {
+        return accumulator.getValueAtPercentile(99);
+    }
+}
+```
+
+Note that Percentile99 defines three methods: init(), add(), result(). init() 
creates accumulator, and add accumulates the value to accumulator, and result 
converts accumulator to final result.
+
+Since HdrHistogram treats long type, we cast TIME_US as BIGINT before passing 
TIME_US to PERCENTILE_99.
+
+Same as UDF, this class should be in classpath, so in order to define UDAF, 
you need to create jar file which contains UADF classes and run `storm sql` 
with `--jar` option.
+This page assumes that Percentile99 is in classpath, for simplicity.
+
+Let's execute it.
+
+```
+$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql 
apache_log_slow_filtering --artifacts 
"org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+You can see the output via console:
+
+```
+$ kafka-console-consumer --zookeeper localhost:2181 --topic 
apache-slowlogs-aggregated
+```
+
+and the output will be similar to:
+
+```
+{"DUMMY_ID":1741,"REQUEST_URL":"/apps/cart.jsp?appID=2242","REQUEST_METHOD":"POST","CNT":1,"TIME_ELAPSED_MS_MIN":303.08,"TIME_ELAPSED_MS_MAX":303.08,"TIME_ELAPSED_MS_AVG":303.08,"TIME_ELAPSED_MS_PERCENTILE_99":303.103}
+{"DUMMY_ID":1748,"REQUEST_URL":"/app/main/posts","REQUEST_METHOD":"PUT","CNT":7,"TIME_ELAPSED_MS_MIN":107.398,"TIME_ELAPSED_MS_MAX":913.603,"TIME_ELAPSED_MS_AVG":489.1445714285714,"TIME_ELAPSED_MS_PERCENTILE_99":913.919}
+...
+```
+
+That's it! Supposing we have UDF which queries geo location via remote ip, we 
can filter or group by geo location. 
+And having more UDAFs like stddev, more percentiles will provide better 
information regarding latency.
+
+## Summary
+
+We looked through several simple use cases for Storm SQL to learn Storm SQL 
features. If you haven't looked at [Storm SQL integration](storm-sql.html) and 
[Storm SQL language](storm-sql-reference.html), you need to read it to see full 
supported features. 
+
+Note that Storm SQL is running on Trident, which is micro-batch, and also no 
strong typed. 
+Aggregation (and join, not touched from example) is in place within a batch, 
and sink doesn't actually check the type.
+(You may noticed that the types of some of output fields are different than 
output table schema.)
+
+Its behavior is subject to change when Storm SQL changes its backend API to 
core (tuple by tuple, low-level or high-level) one.
\ No newline at end of file

Reply via email to