This is an automated email from the ASF dual-hosted git repository.

gates pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b6f371a  HIVE-21869 Clean up the Kafka storage handler readme and 
examples (Kristopher Kane via Alan Gates)
b6f371a is described below

commit b6f371ad95e654f47f2a55233af2959182379eb9
Author: Alan Gates <ga...@hortonworks.com>
AuthorDate: Tue Jun 25 14:12:27 2019 -0700

    HIVE-21869 Clean up the Kafka storage handler readme and examples 
(Kristopher Kane via Alan Gates)
---
 kafka-handler/README.md | 437 ++++++++++++++++++++++++++++++++++++------------
 1 file changed, 328 insertions(+), 109 deletions(-)

diff --git a/kafka-handler/README.md b/kafka-handler/README.md
index c986d85..753e3e3 100644
--- a/kafka-handler/README.md
+++ b/kafka-handler/README.md
@@ -1,33 +1,59 @@
 # Kafka Storage Handler Module
 
-Storage Handler that allows user to Connect/Analyse/Transform Kafka topics.
-The workflow is as follow,  first the user will create an external table that 
is a view over one Kafka topic,
-then the user will be able to run any SQL query including write back to the 
same table or different kafka backed table.
+Storage Handler that allows users to connect/analyze/transform Kafka topics.
+The workflow is as follows:
+- First, the user will create an external table that is a view over one Kafka 
topic
+- Second, the user will be able to run any SQL query including write back to 
the same table or different Kafka backed table
+
+## Kafka Management
+
+Kafka Java client version: 2.x
+
+This handler does not commit offsets of topic partition reads either using the 
intrinsic Kafka capability or in an external
+storage.  This means a query over a Kafka topic backed table will be a full 
topic read unless partitions are filtered
+manually, via SQL, by the methods described below. In the ETL section, a 
method for storing topic offsets in Hive tables
+is provided for tracking consumer position but this is not a part of the 
handler itself.
 
 ## Usage
 
 ### Create Table
-Use following statement to create table:
+Use the following statement to create a table:
+
 ```sql
-CREATE EXTERNAL TABLE kafka_table
-(`timestamp` timestamp , `page` string,  `newPage` boolean,
- added int, deleted bigint, delta double)
-STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
-TBLPROPERTIES
-("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
+CREATE EXTERNAL TABLE 
+  kafka_table (
+    `timestamp` TIMESTAMP,
+    `page` STRING,
+    `newPage` BOOLEAN,
+    `added` INT, 
+    `deleted` BIGINT,
+    `delta` DOUBLE)
+STORED BY 
+  'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES ( 
+  "kafka.topic" = "test-topic",
+  "kafka.bootstrap.servers" = "localhost:9092");
 ```
-Table property `kafka.topic` is the Kafka Topic to connect to and 
`kafka.bootstrap.servers` is the Broker connection string.
+
+The table property `kafka.topic` is the Kafka topic to connect to and 
`kafka.bootstrap.servers` is the Kafka broker connection string.
 Both properties are mandatory.
-On the write path if such a topic does not exists the topic will be created if 
Kafka broker admin policy allow such operation.
+On the write path if such a topic does not exist the topic will be created if 
Kafka broker admin policy allows for 
+auto topic creation.
+
+By default the serializer and deserializer is JSON, specifically 
`org.apache.hadoop.hive.serde2.JsonSerDe`.
+
+If you want to change the serializer/deserializer classes you can update the 
TBLPROPERTIES with SQL syntax `ALTER TABLE`.
 
-By default the serializer and deserializer is Json 
`org.apache.hadoop.hive.serde2.JsonSerDe`.
-If you want to switch serializer/deserializer classes you can use alter table.
 ```sql
-ALTER TABLE kafka_table SET TBLPROPERTIES 
("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
-``` 
-List of supported Serializer Deserializer:
+ALTER TABLE 
+  kafka_table 
+SET TBLPROPERTIES (
+  "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+ 
+List of supported serializers and deserializers:
 
-|Supported Serializer Deserializer|
+|Supported Serializers and Deserializers|
 |-----|
 |org.apache.hadoop.hive.serde2.JsonSerDe|
 |org.apache.hadoop.hive.serde2.OpenCSVSerde|
@@ -35,8 +61,9 @@ List of supported Serializer Deserializer:
 |org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe|
 |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|
 
-#### Table definition 
-In addition to the user defined payload schema Kafka Storage Handler will 
append additional columns allowing user to query the Kafka metadata fields:
+#### Table Definitions 
+In addition to the user defined column schema, this handler will append 
additional columns allowing
+the user to query the Kafka metadata fields:
 - `__key` Kafka record key (byte array)
 - `__partition` Kafka record partition identifier (int 32)
 - `__offset` Kafka record offset (int 64)
@@ -47,71 +74,139 @@ In addition to the user defined payload schema Kafka 
Storage Handler will append
 
 List the table properties and all the partition/offsets information for the 
topic. 
 ```sql
-Describe extended kafka_table;
+DESCRIBE EXTENDED 
+  kafka_table;
 ```
 
-Count the number of records with Kafka record timestamp within the last 10 
minutes interval.
+Count the number of records where the record timestamp is within the last 10 
minutes of query execution time.
 
 ```sql
-SELECT count(*) from kafka_table 
-where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval 
'10' MINUTES);
+SELECT 
+  COUNT(*)
+FROM 
+  kafka_table 
+WHERE 
+  `__timestamp` >  1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '10' 
MINUTES);
 ```
-The storage handler allow filter push-down read optimization,
-for instance the query above will only read the records with timestamp 
satisfying the filter predicate. 
-Please note that such time based seek is only viable if the Kafka broker allow 
time based lookup (Kafka 0.11 or later versions)
 
-In addition to **time based seek**, the storage handler reader is able to seek 
to a particular partition offset using the SQL WHERE clause.
-Currently only support OR/AND with (<, <=, >=, >)
+The storage handler allows these metadata fields to filter push-down read 
optimizations to Kafka.
+For instance, the query above will only read the records with timestamp 
satisfying the filter predicate. 
+Please note that such time based filtering (Kafka consumer partition seek) is 
only viable if the Kafka broker 
+version allows time based look up (Kafka 0.11 or later versions)
+
+In addition to **time based filtering**, the storage handler reader is able to 
filter based on a
+particular partition offset using the SQL WHERE clause.
+Currently supports operators `OR` and `AND` with comparison operators `<`, 
`<=`, `>=`, `>`. 
+
+#### Metadata Query Examples
 
 ```sql
-SELECT count(*)  from kafka_table
-where (`__offset` < 10 and `__offset`>3 and `__partition` = 0)
-or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99)
-or (`__offset` = 109);
+SELECT
+  COUNT(*)
+FROM 
+  kafka_table
+WHERE 
+  (`__offset` < 10 AND `__offset` >3 AND `__partition` = 0)
+  OR 
+  (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99)
+  OR (`__offset` = 109);
 ```
 
-User can define a view to take of the last 15 minutes and mask what ever 
column as follow:
+Keep in mind that partitions can grow and shrink within the Kafka cluster 
without the consumer's knowledge. This 
+partition and offset capability is good for replay of specific partitions when 
the consumer knows that something has 
+gone wrong down stream or replay is required.  Apache Hive users may or may 
not understand the underlying architecture 
+of Kafka therefore, filtering on the record timestamp metadata column is 
arguably the best filter to use since it 
+requires no partition knowledge. 
+
+The user can define a view to take of the last 15 minutes and mask what ever 
column as follows:
 
 ```sql
-CREATE VIEW last_15_minutes_of_kafka_table as select  `timestamp`, `user`, 
delta, added from kafka_table 
-where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval 
'15' MINUTES);
+CREATE VIEW 
+  last_15_minutes_of_kafka_table 
+AS 
+SELECT 
+  `timestamp`,
+  `user`, 
+  `delta`, 
+  `added` 
+FROM 
+  kafka_table 
+WHERE 
+  `__timestamp` >  1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '15' 
MINUTES);
 ```
 
-Join the Kafka Stream to Hive table. For instance assume you want to join the 
last 15 minutes of stream to dimension table like the following.
+Join the Kafka topic to Hive table. For instance, assume you want to join the 
last 15 minutes of the topic to 
+a dimension table with the following example: 
+
 ```sql
-CREATE TABLE user_table (`user` string, `first_name` string , age int, gender 
string, comments string) STORED as ORC ;
+CREATE TABLE 
+  user_table (
+  `user` STRING, 
+  `first_name` STRING, 
+  `age` INT, 
+  `gender` STRING, 
+  `comments` STRING ) 
+STORED AS ORC;
 ```
 
-Join the view of the last 15 minutes to `user_table`, group by user gender 
column and compute aggregates
-over metrics from fact table and dimension table.
+Join the view of the last 15 minutes to `user_table`, group by the `gender` 
column and compute aggregates
+over metrics from the fact and dimension tables.
 
 ```sql
-SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, 
avg(age) as avg_age , gender 
-FROM last_15_minutes_of_kafka_table  join user_table on 
`last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
-GROUP BY gender limit 10;
+SELECT 
+  SUM(`added`) AS `added`, 
+  SUM(`deleted`) AS `deleted`, 
+  AVG(`delta`) AS `delta`, 
+  AVG(`age`) AS `avg_age`, 
+  `gender` 
+FROM 
+  last_15_minutes_of_kafka_table 
+JOIN 
+  user_table ON 
+    last_15_minutes_of_kafka_table.`user` = user_table.`user`
+GROUP BY 
+  `gender` 
+LIMIT 10;
 ```
 
+In cases where you want to perform some ad-hoc analysis over the last 15 
minutes of topic data,
+you can join it on itself. In the following example, we show how you can 
perform classical 
+user retention analysis over the Kafka topic.
 
-Join the Stream to the Stream it self. In cases where you want to perform some 
Ad-Hoc query over the last 15 minutes view.
-In the following example we show how you can perform classical user retention 
analysis over the Kafka Stream.
 ```sql
--- Steam join over the view it self
+-- Topic join over the view itself
 -- The example is adapted from 
https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
--- Assuming l15min_wiki is a view of the last 15 minutes
-select  count( distinct activity.`user`) as active_users, count(distinct 
future_activity.`user`) as retained_users
-from l15min_wiki as activity
-left join l15min_wiki as future_activity on
+-- Assuming l15min_wiki is a view of the last 15 minutes based on the topic's 
timestamp record metadata
+
+SELECT 
+  COUNT(DISTINCT `activity`.`user`) AS `active_users`, 
+  COUNT(DISTINCT `future_activity`.`user`) AS `retained_users`
+FROM 
+  l15min_wiki AS activity
+LEFT JOIN 
+  l15min_wiki AS future_activity
+ON
   activity.`user` = future_activity.`user`
-  and activity.`timestamp` = future_activity.`timestamp` - interval '5' 
minutes ;
-
---  Stream to stream join
--- Assuming wiki_kafka_hive is the entire stream.
-select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as 
active_users, count(distinct future_activity.`user`) as retained_users
-from wiki_kafka_hive as activity
-left join wiki_kafka_hive as future_activity on
+AND 
+  activity.`timestamp` = future_activity.`timestamp` - INTERVAL '5' MINUTES;
+
+--  Topic to topic join
+-- Assuming wiki_kafka_hive is the entire topic
+
+SELECT 
+  FLOOR_HOUR(activity.`timestamp`), 
+  COUNT(DISTINCT activity.`user`) AS `active_users`, 
+  COUNT(DISTINCT future_activity.`user`) AS retained_users
+FROM 
+  wiki_kafka_hive AS activity
+LEFT JOIN 
+  wiki_kafka_hive AS future_activity 
+ON
   activity.`user` = future_activity.`user`
-  and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour 
group by floor_hour(activity.`timestamp`); 
-
+AND 
+  activity.`timestamp` = future_activity.`timestamp` - INTERVAL '1' HOUR 
+GROUP BY 
+  FLOOR_HOUR(activity.`timestamp`); 
 ```
 
 # Configuration
@@ -130,88 +225,212 @@ left join wiki_kafka_hive as future_activity on
 
 
 ### Setting Extra Consumer/Producer properties.
-User can inject custom Kafka consumer/producer properties via the Table 
properties.
-To do so user can add any key/value pair of Kafka config to the Hive table 
property
+The user can inject custom Kafka consumer/producer properties via the table 
properties.
+To do so, the user can add any key/value pair of Kafka config to the Hive 
table property
 by prefixing the key with `kafka.consumer` for consumer configs and 
`kafka.producer` for producer configs.
 For instance the following alter table query adds the table property 
`"kafka.consumer.max.poll.records" = "5000"` 
 and will inject `max.poll.records=5000` to the Kafka Consumer.
+
 ```sql
-ALTER TABLE kafka_table SET TBLPROPERTIES 
("kafka.consumer.max.poll.records"="5000");
+ALTER TABLE 
+  kafka_table 
+SET TBLPROPERTIES 
+  ("kafka.consumer.max.poll.records" = "5000");
 ```
 
-# Kafka to Hive ETL PIPE LINE
+# Kafka to Hive ETL Pipeline Example
+
+In this example we will load topic data only once.  The goal is to read data 
and commit both data and 
+offsets in a single Transaction 
 
-load form Kafka every Record exactly once
-Goal is to read data and commit both data and its offsets in a single 
Transaction 
+First, create the offset table.
 
-First create the offset table.
 ```sql
-Drop table kafka_table_offsets;
-create table kafka_table_offsets(partition_id int, max_offset bigint, 
insert_time timestamp);
+DROP TABLE 
+  kafka_table_offsets;
+  
+CREATE TABLE 
+  kafka_table_offsets (
+  `partition_id` INT,
+  `max_offset` BIGINT,
+  `insert_time` TIMESTAMP);
 ``` 
 
 Initialize the table
+
 ```sql
-insert overwrite table kafka_table_offsets select `__partition`, 
min(`__offset`) - 1, CURRENT_TIMESTAMP 
-from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ;
+INSERT OVERWRITE TABLE 
+  kafka_table_offsets 
+SELECT
+ `__partition`, 
+ MIN(`__offset`) - 1, 
+ CURRENT_TIMESTAMP 
+FROM 
+  wiki_kafka_hive 
+GROUP BY 
+  `__partition`, 
+  CURRENT_TIMESTAMP;
 ``` 
-Create the end target table on the Hive warehouse.
+
+Create the final Hive table for warehouse use,
+
 ```sql
-Drop table orc_kafka_table;
-Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp 
bigint,
- `timestamp` timestamp , `page` string, `user` string, `diffurl` string,
- `isrobot` boolean, added int, deleted int, delta bigint
-) stored as ORC;
+DROP TABLE 
+  orc_kafka_table;
+  
+CREATE TABLE 
+  orc_kafka_table (
+  `partition_id` INT,
+  `koffset` BIGINT,
+  `ktimestamp` BIGINT,
+  `timestamp` TIMESTAMP,
+  `page` STRING,
+  `user` STRING,
+  `diffurl` STRING,
+  `isrobot` BOOLEAN,
+  `added` INT,
+  `deleted` INT,
+  `delta` BIGINT) 
+STORED AS ORC;
 ```
-This an example tp insert up to offset = 2 only
+
+This is an example that inserts up to offset = 2 only.
 
 ```sql
-From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
-on (ktable.`__partition` = offset_table.partition_id
-and ktable.`__offset` > offset_table.max_offset and  ktable.`__offset` < 3 )
-insert into table orc_kafka_table select `__partition`, `__offset`, 
`__timestamp`,
-`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
-Insert overwrite table kafka_table_offsets select
-`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, 
CURRENT_TIMESTAMP;
+FROM
+  wiki_kafka_hive ktable 
+JOIN 
+  kafka_table_offsets offset_table
+ON (
+    ktable.`__partition` = offset_table.`partition_id`
+  AND 
+    ktable.`__offset` > offset_table.`max_offset` 
+  AND  
+    ktable.`__offset` < 3 )
+    
+INSERT INTO TABLE 
+  orc_kafka_table 
+SELECT 
+  `__partition`,
+  `__offset`,
+  `__timestamp`,
+  `timestamp`, 
+  `page`, 
+  `user`, 
+  `diffurl`, 
+  `isrobot`,
+  `added`, 
+  `deleted`,
+  `delta`
+  
+INSERT OVERWRITE TABLE 
+  kafka_table_offsets 
+SELECT
+  `__partition`,
+  max(`__offset`),
+  CURRENT_TIMESTAMP
+GROUP BY
+  `__partition`, 
+  CURRENT_TIMESTAMP;
 ```
 
-Double check the insert
+Double check the insert.
+
 ```sql
-select max(`koffset`) from orc_kafka_table limit 10;
-select count(*) as c  from orc_kafka_table group by partition_id, koffset 
having c > 1;
+SELECT
+  max(`koffset`) 
+FROM
+  orc_kafka_table 
+LIMIT 10;
+
+SELECT
+  COUNT(*) AS `c`  
+FROM
+  orc_kafka_table
+GROUP BY
+  `partition_id`,
+  `koffset` 
+HAVING 
+  `c` > 1;
 ```
 
-Repeat this periodically to insert all data.
+Conduct this as data becomes available on the topic. 
 
 ```sql
-From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
-on (ktable.`__partition` = offset_table.partition_id
-and ktable.`__offset` > offset_table.max_offset )
-insert into table orc_kafka_table select `__partition`, `__offset`, 
`__timestamp`,
-`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
-Insert overwrite table kafka_table_offsets select
-`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, 
CURRENT_TIMESTAMP;
+FROM
+  wiki_kafka_hive ktable 
+JOIN 
+  kafka_table_offsets offset_table
+ON ( 
+    ktable.`__partition` = offset_table.`partition_id`
+  AND 
+    ktable.`__offset` > `offset_table.max_offset`)
+    
+INSERT INTO TABLE 
+  orc_kafka_table 
+SELECT 
+  `__partition`, 
+  `__offset`, 
+  `__timestamp`,
+  `timestamp`, 
+  `page`, 
+  `user`, 
+  `diffurl`, 
+  `isrobot`, 
+  `added`, 
+  `deleted`, 
+  `delta`
+  
+INSERT OVERWRITE TABLE 
+  kafka_table_offsets 
+SELECT
+  `__partition`, 
+  max(`__offset`), 
+  CURRENT_TIMESTAMP 
+GROUP BY 
+  `__partition`, 
+  CURRENT_TIMESTAMP;
 ```
 
 # ETL from Hive to Kafka
 
-## INSERT INTO
-First create the table in have that will be the target table. Now all the 
inserts will go to the topic mapped by this Table.
+## Kafka topic append with INSERT
+First create the table in Hive that will be the target table. Now all the 
inserts will go to the topic mapped by 
+this table.  Be aware that the Avro SerDe used below is regular Apache Avro 
(with schema) and not Confluent serialized
+Avro which is popular with Kafka usage
 
 ```sql
-CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
-(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , 
avg_delta double )
-STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
-TBLPROPERTIES
-("kafka.topic" = "moving_avg_wiki_kafka_hive_2",
-"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092",
--- STORE AS AVRO IN KAFKA
-"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+CREATE EXTERNAL TABLE 
+  moving_avg_wiki_kafka_hive ( 
+    `channel` STRING, 
+    `namespace` STRING,
+    `page` STRING,
+    `timestamp` TIMESTAMP, 
+    `avg_delta` DOUBLE)
+STORED BY 
+  'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES(
+  "kafka.topic" = "moving_avg_wiki_kafka_hive_2",
+  "kafka.bootstrap.servers"="localhost:9092",
+  -- STORE AS AVRO IN KAFKA
+  "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
 
-Then insert data into the table. Keep in mind that Kafka is an append only, 
thus you can not use insert overwrite. 
+Then, insert data into the table. Keep in mind that Kafka is append only thus 
you can not use insert overwrite. 
+
 ```sql
-insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, 
`page`, `timestamp`, 
-avg(delta) over (order by `timestamp` asc rows between  60 preceding and 
current row) as avg_delta, 
-null as `__key`, null as `__partition`, -1, -1 from l15min_wiki;
+INSERT INTO TABLE
+  moving_avg_wiki_kafka_hive 
+SELECT 
+  `channel`, 
+  `namespace`, 
+  `page`, 
+  `timestamp`, 
+  avg(`delta`) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN 60 PRECEDING AND 
CURRENT ROW) AS `avg_delta`, 
+  null AS `__key`, 
+  null AS `__partition`, 
+  -1, 
+  -1 
+FROM 
+  l15min_wiki;
 ```

Reply via email to