[
https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
slim bouguerra updated HIVE-20377:
----------------------------------
Description:
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition
id, offset and timestamp.
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS
statement.
h1. Example
h2. Create the external table
{code}
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user`
string, language string, added int, deleted int, flags string,comment string,
namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "wikipedia",
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add
automatically the Kafka row metadata eg partition id, record offset and record
timestamp.
{code}
DESCRIBE EXTENDED kafka_table
timestamp timestamp from deserializer
page string from deserializer
user string from deserializer
language string from deserializer
country string from deserializer
continent string from deserializer
namespace string from deserializer
newpage boolean from deserializer
unpatrolled boolean from deserializer
anonymous boolean from deserializer
robot boolean from deserializer
added int from deserializer
deleted int from deserializer
delta bigint from deserializer
__partition int from deserializer
__offset bigint from deserializer
__timestamp bigint from deserializer
{code}
h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a
given offset. The proposed storage handler will be able to leverage such API by
pushing down filters over metadata columns, namely __partition (int),
__offset(long) and __timestamp(long)
For instance Query like
{code}
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and
`__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` >
99) or (`__offset` = 109);
{code}
Will result on a scan of partition 0 only then read only records between offset
4 and 109.
h2. With timestamp seeks
The seeking based on the internal timestamps allows the handler to run on
recently arrived data, by doing
{code}
select count(*) from kafka_table where `__timestamp` > 1000 *
to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
{code}
This allows for implicit relationships between event timestamps and kafka
timestamps to be expressed in queries (i.e event_timestamp is always < than
kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).
h2. More examples with Avro
{code}
CREATE EXTERNAL TABLE wiki_kafka_avro_table
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "wiki_kafka_avro_table",
"kafka.bootstrap.servers"="localhost:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
'avro.schema.literal'='{
"type" : "record",
"name" : "Wikipedia",
"namespace" : "org.apache.hive.kafka",
"version": "1",
"fields" : [ {
"name" : "isrobot",
"type" : "boolean"
}, {
"name" : "channel",
"type" : "string"
}, {
"name" : "timestamp",
"type" : "string"
}, {
"name" : "flags",
"type" : "string"
}, {
"name" : "isunpatrolled",
"type" : "boolean"
}, {
"name" : "page",
"type" : "string"
}, {
"name" : "diffurl",
"type" : "string"
}, {
"name" : "added",
"type" : "long"
}, {
"name" : "comment",
"type" : "string"
}, {
"name" : "commentlength",
"type" : "long"
}, {
"name" : "isnew",
"type" : "boolean"
}, {
"name" : "isminor",
"type" : "boolean"
}, {
"name" : "delta",
"type" : "long"
}, {
"name" : "isanonymous",
"type" : "boolean"
}, {
"name" : "user",
"type" : "string"
}, {
"name" : "deltabucket",
"type" : "double"
}, {
"name" : "deleted",
"type" : "long"
}, {
"name" : "namespace",
"type" : "string"
} ]
}'
);
{code}
was:
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition
id, offset and timestamp.
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS
statement.
h1. Example
h2. Create the external table
{code}
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user`
string, language string, added int, deleted int, flags string,comment string,
namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "wikipedia",
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add
automatically the Kafka row metadata eg partition id, record offset and record
timestamp.
{code}
DESCRIBE EXTENDED kafka_table
timestamp timestamp from deserializer
page string from deserializer
user string from deserializer
language string from deserializer
country string from deserializer
continent string from deserializer
namespace string from deserializer
newpage boolean from deserializer
unpatrolled boolean from deserializer
anonymous boolean from deserializer
robot boolean from deserializer
added int from deserializer
deleted int from deserializer
delta bigint from deserializer
__partition int from deserializer
__offset bigint from deserializer
__timestamp bigint from deserializer
{code}
h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a
given offset. The proposed storage handler will be able to leverage such API by
pushing down filters over metadata columns, namely __partition (int),
__offset(long) and __timestamp(long)
For instance Query like
{code}
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and
`__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` >
99) or (`__offset` = 109);
{code}
Will result on a scan of partition 0 only then read only records between offset
4 and 109.
h2. With timestamp seeks
The seeking based on the internal timestamps allows the handler to run on
recently arrived data, by doing
{code}
select count(*) from kafka_table where `__timestamp` > 1000 *
to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
{code}
This allows for implicit relationships between event timestamps and kafka
timestamps to be expressed in queries (i.e event_timestamp is always < than
kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).
> Hive Kafka Storage Handler
> --------------------------
>
> Key: HIVE-20377
> URL: https://issues.apache.org/jira/browse/HIVE-20377
> Project: Hive
> Issue Type: New Feature
> Affects Versions: 4.0.0
> Reporter: slim bouguerra
> Assignee: slim bouguerra
> Priority: Major
> Attachments: HIVE-20377.10.patch, HIVE-20377.11.patch,
> HIVE-20377.12.patch, HIVE-20377.4.patch, HIVE-20377.5.patch,
> HIVE-20377.6.patch, HIVE-20377.8.patch, HIVE-20377.8.patch, HIVE-20377.patch
>
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record
> partition id, offset and timestamp.
> * Insert streaming data form Kafka to an actual Hive internal table, using
> CTAS statement.
> h1. Example
> h2. Create the external table
> {code}
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user`
> string, language string, added int, deleted int, flags string,comment string,
> namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES
> ("kafka.topic" = "wikipedia",
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add
> automatically the Kafka row metadata eg partition id, record offset and
> record timestamp.
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp timestamp from deserializer
> page string from deserializer
> user string from deserializer
> language string from deserializer
> country string from deserializer
> continent string from deserializer
> namespace string from deserializer
> newpage boolean from deserializer
> unpatrolled boolean from deserializer
> anonymous boolean from deserializer
> robot boolean from deserializer
> added int from deserializer
> deleted int from deserializer
> delta bigint from deserializer
> __partition int from deserializer
> __offset bigint from deserializer
> __timestamp bigint from deserializer
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on
> a given offset. The proposed storage handler will be able to leverage such
> API by pushing down filters over metadata columns, namely __partition (int),
> __offset(long) and __timestamp(long)
> For instance Query like
> {code}
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3
> and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and
> `__offset` > 99) or (`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between
> offset 4 and 109.
> h2. With timestamp seeks
> The seeking based on the internal timestamps allows the handler to run on
> recently arrived data, by doing
> {code}
> select count(*) from kafka_table where `__timestamp` > 1000 *
> to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
> {code}
> This allows for implicit relationships between event timestamps and kafka
> timestamps to be expressed in queries (i.e event_timestamp is always < than
> kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).
> h2. More examples with Avro
> {code}
> CREATE EXTERNAL TABLE wiki_kafka_avro_table
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES
> ("kafka.topic" = "wiki_kafka_avro_table",
> "kafka.bootstrap.servers"="localhost:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
> 'avro.schema.literal'='{
> "type" : "record",
> "name" : "Wikipedia",
> "namespace" : "org.apache.hive.kafka",
> "version": "1",
> "fields" : [ {
> "name" : "isrobot",
> "type" : "boolean"
> }, {
> "name" : "channel",
> "type" : "string"
> }, {
> "name" : "timestamp",
> "type" : "string"
> }, {
> "name" : "flags",
> "type" : "string"
> }, {
> "name" : "isunpatrolled",
> "type" : "boolean"
> }, {
> "name" : "page",
> "type" : "string"
> }, {
> "name" : "diffurl",
> "type" : "string"
> }, {
> "name" : "added",
> "type" : "long"
> }, {
> "name" : "comment",
> "type" : "string"
> }, {
> "name" : "commentlength",
> "type" : "long"
> }, {
> "name" : "isnew",
> "type" : "boolean"
> }, {
> "name" : "isminor",
> "type" : "boolean"
> }, {
> "name" : "delta",
> "type" : "long"
> }, {
> "name" : "isanonymous",
> "type" : "boolean"
> }, {
> "name" : "user",
> "type" : "string"
> }, {
> "name" : "deltabucket",
> "type" : "double"
> }, {
> "name" : "deleted",
> "type" : "long"
> }, {
> "name" : "namespace",
> "type" : "string"
> } ]
> }'
> );
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)