[ 
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480062#comment-16480062
 ] 

ASF GitHub Bot commented on DRILL-5977:
---------------------------------------

aravi5 opened a new pull request #1272: DRILL-5977: Filter Pushdown in 
Drill-Kafka plugin
URL: https://github.com/apache/drill/pull/1272
 
 
   # Implementation of Filter Pushdown in Drill-Kafka plugin
   
   This PR contains changes for implementing `Filter Pushdown` feature for 
conditions on `Metadata fields` such as *Timestamps*, *Offsets* and 
*Partitions*. Below is a high level description, to help the reviewers.
   
   [Design document is available 
here](https://docs.google.com/document/d/1SZ4wO4Ii4nAHwgWbY6JJynPOseCha0DBBZJ2ig1RO0M/edit#heading=h.uvq8jbet5xdh)
   
   ## Topics, Partitions, Offsets and Timestamps
   
   Apache Kafka is a distributed `publisher-subscriber` system. Kafka consists 
of `Topics` which are something like channels. Producer produce to topics and 
Consumers subscribe to topics. In the Drill world, every topic is considered as 
a `Table`.
   
   To achieve load balancing and parallelism, the concept of `Partitions` was 
introduced. Each topic can have multiple partitions and messages produced to 
topic are distributed across partitions (producers can also specify partition 
to produce). 
   
   Kafka is essentially a distributed log, so every message within a 
`Topic-Partition` is identified by a unique `Offset`. This offset represents 
the distance of any message from the _beginning_ of the topic-partition.
   
   Since Kafka can be used to store events / event-logs, one would look to 
Kafka to find out when an event occurred. So every messages is thus associated 
with a `Timestamp`. Kafka allows the producer (user application) to specify the 
timestamp value or it can add its (server / broker) timestamp before storing 
the message / log in topic-partition. Note that since user can specify the 
timestamp value, it is possible to that the timestamp values can appear out of 
order as we consume from beginning to end.
   
   ## Drill-Kafka Plugin
   
   Drill-Kafka Plugin is very useful to explore the data and perform analytics. 
Currently, all of the data that resides in a topic has to be consumed by Drill 
before applying filters. However, it is possible to reduce the number of 
messages read from Kafka based on conditions provided on fields - 
`kafkaMsgTimestamp`, `kafkaMsgOffset` and `kafkaPartitionId`.
   
   Scan specification for Drill-Kafka plugin is a collection of specification 
for individual partitions within each topic. This specification consists of 
`topicName`, `partitionId`, `startOffset` and `endOffset`. 
   
   ## Filter Pushdown Implementation
   
   Filter pushdown implementation is governed by the idea that conditions on 
`metadata fields` can be translated to modifying the `startOffset` and 
`endOffset` for each partition. Thus reducing the amount of data read.
   
   ### Pushdown Timestamp
   A common query when querying a event log system involves a query with 
conditions on timestamp field. For Drill-Kafka plugin, `kafkaMsgTimestamp` 
field maps to timestamp stored as a part of each message in Kafka. A pushdown 
for following compare functions is supported `equal`, `greater_than` and 
`greater_than_or_equal_to`.
   
   Kafka exposes a `Consumer API` to obtain earliest offset for a given 
timestamp value.
   ```
   public java.util.Map<TopicPartition,OffsetAndTimestamp> 
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
   ```
   This API is used to figure out the `startOffset` for each partition in a 
topic. Note that the timestamps may not appear in increasing order when reading 
from a Kafka topic because Kafka gives user flexibility to define timestamp for 
a message. However, the above API returns the first offset (from the beginning 
of a topic partition) where timestamp is *greater or equal* to timestamp 
requested. Thus, we cannot support pushdown on `less_than` or 
`less_than_or_equal_to` because a lesser timestamp may exist beyond `endOffset` 
computed.
   
   
   ### Pushdown Offset
   Conditions on `kafkaMsgOffset` will be pushed down for `equal`, 
`greater_than`, `greater_than_or_equal_to`, `less_than` and 
`less_than_or_equal_to` functions.
   
   ### Pushdown Partition
   Conditions on `kafkaPartitionId` will help limit number of partitions to be 
scanned. (Useful for data exploration).
   
   Even after creating new scan specification, based on the filter conditions, 
we will have to re-apply the filters because the optimization only restricts 
the scan range. 
   
   ### Pushdown with OR
   At any level of the expression tree, OR node will return a new scan 
specification only if ALL conditions under OR support pushdown.
   
   
   ## Handling corner-cases
   Kafka supports `Time-To-Live` thus messages can expire as at some point in 
time. So it is possible for queries to include expired values / non-existent 
values for timestamps and offsets. The implementation handles these cases as 
well.
   
   
   ## Unit Test
   Unit tests are added and cover queries (both with pushdown supported and not 
supported).
   ```
   Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 158.327 sec 
- in org.apache.drill.exec.store.kafka.KafkaFilterPushdownTest
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
>                 Key: DRILL-5977
>                 URL: https://issues.apache.org/jira/browse/DRILL-5977
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: B Anil Kumar
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting 
> point or a count? Perhaps I want to run my query every five minutes, scanning 
> only those messages since the previous scan. Or, I want to limit my take to, 
> say, the next 1000 messages. Could we use a pseudo-column such as 
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to