[
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480062#comment-16480062
]
ASF GitHub Bot commented on DRILL-5977:
---------------------------------------
aravi5 opened a new pull request #1272: DRILL-5977: Filter Pushdown in
Drill-Kafka plugin
URL: https://github.com/apache/drill/pull/1272
# Implementation of Filter Pushdown in Drill-Kafka plugin
This PR contains changes for implementing `Filter Pushdown` feature for
conditions on `Metadata fields` such as *Timestamps*, *Offsets* and
*Partitions*. Below is a high level description, to help the reviewers.
[Design document is available
here](https://docs.google.com/document/d/1SZ4wO4Ii4nAHwgWbY6JJynPOseCha0DBBZJ2ig1RO0M/edit#heading=h.uvq8jbet5xdh)
## Topics, Partitions, Offsets and Timestamps
Apache Kafka is a distributed `publisher-subscriber` system. Kafka consists
of `Topics` which are something like channels. Producer produce to topics and
Consumers subscribe to topics. In the Drill world, every topic is considered as
a `Table`.
To achieve load balancing and parallelism, the concept of `Partitions` was
introduced. Each topic can have multiple partitions and messages produced to
topic are distributed across partitions (producers can also specify partition
to produce).
Kafka is essentially a distributed log, so every message within a
`Topic-Partition` is identified by a unique `Offset`. This offset represents
the distance of any message from the _beginning_ of the topic-partition.
Since Kafka can be used to store events / event-logs, one would look to
Kafka to find out when an event occurred. So every messages is thus associated
with a `Timestamp`. Kafka allows the producer (user application) to specify the
timestamp value or it can add its (server / broker) timestamp before storing
the message / log in topic-partition. Note that since user can specify the
timestamp value, it is possible to that the timestamp values can appear out of
order as we consume from beginning to end.
## Drill-Kafka Plugin
Drill-Kafka Plugin is very useful to explore the data and perform analytics.
Currently, all of the data that resides in a topic has to be consumed by Drill
before applying filters. However, it is possible to reduce the number of
messages read from Kafka based on conditions provided on fields -
`kafkaMsgTimestamp`, `kafkaMsgOffset` and `kafkaPartitionId`.
Scan specification for Drill-Kafka plugin is a collection of specification
for individual partitions within each topic. This specification consists of
`topicName`, `partitionId`, `startOffset` and `endOffset`.
## Filter Pushdown Implementation
Filter pushdown implementation is governed by the idea that conditions on
`metadata fields` can be translated to modifying the `startOffset` and
`endOffset` for each partition. Thus reducing the amount of data read.
### Pushdown Timestamp
A common query when querying a event log system involves a query with
conditions on timestamp field. For Drill-Kafka plugin, `kafkaMsgTimestamp`
field maps to timestamp stored as a part of each message in Kafka. A pushdown
for following compare functions is supported `equal`, `greater_than` and
`greater_than_or_equal_to`.
Kafka exposes a `Consumer API` to obtain earliest offset for a given
timestamp value.
```
public java.util.Map<TopicPartition,OffsetAndTimestamp>
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
```
This API is used to figure out the `startOffset` for each partition in a
topic. Note that the timestamps may not appear in increasing order when reading
from a Kafka topic because Kafka gives user flexibility to define timestamp for
a message. However, the above API returns the first offset (from the beginning
of a topic partition) where timestamp is *greater or equal* to timestamp
requested. Thus, we cannot support pushdown on `less_than` or
`less_than_or_equal_to` because a lesser timestamp may exist beyond `endOffset`
computed.
### Pushdown Offset
Conditions on `kafkaMsgOffset` will be pushed down for `equal`,
`greater_than`, `greater_than_or_equal_to`, `less_than` and
`less_than_or_equal_to` functions.
### Pushdown Partition
Conditions on `kafkaPartitionId` will help limit number of partitions to be
scanned. (Useful for data exploration).
Even after creating new scan specification, based on the filter conditions,
we will have to re-apply the filters because the optimization only restricts
the scan range.
### Pushdown with OR
At any level of the expression tree, OR node will return a new scan
specification only if ALL conditions under OR support pushdown.
## Handling corner-cases
Kafka supports `Time-To-Live` thus messages can expire as at some point in
time. So it is possible for queries to include expired values / non-existent
values for timestamps and offsets. The implementation handles these cases as
well.
## Unit Test
Unit tests are added and cover queries (both with pushdown supported and not
supported).
```
Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 158.327 sec
- in org.apache.drill.exec.store.kafka.KafkaFilterPushdownTest
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
> Key: DRILL-5977
> URL: https://issues.apache.org/jira/browse/DRILL-5977
> Project: Apache Drill
> Issue Type: Improvement
> Reporter: B Anil Kumar
> Assignee: Abhishek Ravi
> Priority: Major
> Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting
> point or a count? Perhaps I want to run my query every five minutes, scanning
> only those messages since the previous scan. Or, I want to limit my take to,
> say, the next 1000 messages. Could we use a pseudo-column such as
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)