DRILL-4779: Kafka storage plugin (Kamesh Bhallamudi & Anil Kumar Batchu)

closes #1052


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d3f8da2b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d3f8da2b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d3f8da2b

Branch: refs/heads/master
Commit: d3f8da2b62b13be28c4133980067942cec2a5faf
Parents: 05d8b3c
Author: Anil Kumar Batchu <akumarb2...@gmail.com>
Authored: Mon Nov 27 18:44:55 2017 -0800
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Tue Nov 28 11:27:18 2017 +0200

----------------------------------------------------------------------
 contrib/pom.xml                                 |   1 +
 contrib/storage-kafka/README.md                 | 230 +++++++++++++
 contrib/storage-kafka/pom.xml                   | 101 ++++++
 .../drill/exec/store/kafka/KafkaGroupScan.java  | 319 +++++++++++++++++++
 .../exec/store/kafka/KafkaRecordReader.java     | 145 +++++++++
 .../exec/store/kafka/KafkaScanBatchCreator.java |  55 ++++
 .../drill/exec/store/kafka/KafkaScanSpec.java   |  40 +++
 .../exec/store/kafka/KafkaStoragePlugin.java    | 100 ++++++
 .../store/kafka/KafkaStoragePluginConfig.java   |  78 +++++
 .../drill/exec/store/kafka/KafkaSubScan.java    | 177 ++++++++++
 .../drill/exec/store/kafka/MessageIterator.java | 114 +++++++
 .../drill/exec/store/kafka/MetaDataField.java   |  37 +++
 .../store/kafka/decoders/JsonMessageReader.java | 104 ++++++
 .../store/kafka/decoders/MessageReader.java     |  45 +++
 .../kafka/decoders/MessageReaderFactory.java    |  63 ++++
 .../drill/exec/store/kafka/package-info.java    |  24 ++
 .../store/kafka/schema/KafkaMessageSchema.java  |  86 +++++
 .../store/kafka/schema/KafkaSchemaFactory.java  |  45 +++
 .../resources/bootstrap-storage-plugins.json    |   9 +
 .../src/main/resources/drill-module.conf        |  30 ++
 .../exec/store/kafka/KafkaMessageGenerator.java | 134 ++++++++
 .../exec/store/kafka/KafkaQueriesTest.java      | 109 +++++++
 .../drill/exec/store/kafka/KafkaTestBase.java   |  92 ++++++
 .../exec/store/kafka/MessageIteratorTest.java   | 106 ++++++
 .../drill/exec/store/kafka/QueryConstants.java  |  40 +++
 .../drill/exec/store/kafka/TestKafkaSuit.java   | 107 +++++++
 .../kafka/cluster/EmbeddedKafkaCluster.java     | 166 ++++++++++
 .../decoders/MessageReaderFactoryTest.java      |  67 ++++
 .../storage-kafka/src/test/resources/login.conf |  25 ++
 distribution/pom.xml                            |   5 +
 distribution/src/assemble/bin.xml               |   1 +
 .../org/apache/drill/exec/ExecConstants.java    |  12 +
 .../server/options/SystemOptionManager.java     |   4 +
 .../src/main/resources/drill-module.conf        |   4 +
 .../apache/drill/exec/proto/UserBitShared.java  |   9 +
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 37 files changed, 2688 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index d4ad434..8588987 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -37,6 +37,7 @@
     <module>storage-hive</module>
     <module>storage-mongo</module>
     <module>storage-jdbc</module>
+    <module>storage-kafka</module>
     <module>storage-kudu</module>
     <module>storage-opentsdb</module>
     <module>sqlline</module>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/README.md
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/README.md b/contrib/storage-kafka/README.md
new file mode 100644
index 0000000..a63731f
--- /dev/null
+++ b/contrib/storage-kafka/README.md
@@ -0,0 +1,230 @@
+# Drill Kafka Plugin
+
+Drill kafka storage plugin allows you to perform interactive analysis using 
SQL against Apache Kafka.
+
+<h4 id="Supported kafka versions">Supported Kafka Version</h4>
+Kafka-0.10 and above </p>
+
+<h4 id="Supported Message Formats">Message Formats</h4>
+Currently this plugin supports reading only Kafka messages of type 
<strong>JSON</strong>.
+
+
+<h4>Message Readers</h4>
+<p>Message Readers are used for reading messages from Kafka. Type of the 
MessageReaders supported as of now are</p>
+
+<table style="width:100%">
+  <tr>
+    <th>MessageReader</th>
+    <th>Description</th>
+    <th>Key DeSerializer</th> 
+    <th>Value DeSerializer</th>
+  </tr>
+  <tr>
+    <td>JsonMessageReader</td>
+    <td>To read Json messages</td>
+    <td>org.apache.kafka.common.serialization.ByteArrayDeserializer</td> 
+    <td>org.apache.kafka.common.serialization.ByteArrayDeserializer</td>
+  </tr>
+</table>
+
+
+<h4 id="Plugin Configurations">Plugin Configurations</h4>
+Drill Kafka plugin supports following properties
+<ul>
+   <li><strong>kafkaConsumerProps</strong>: These are typical <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka consumer 
properties</a>.</li>
+<li><strong>System options</strong>: These are Drill Kafka plugin  system 
options. <ul>
+<li><strong>store.kafka.record.reader</strong>: Message Reader implementation 
to use while reading messages from Kafka. Default value is  set to 
org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+</li>
+<li><strong>store.kafka.poll.timeout</strong>: Polling timeout used by Kafka 
client while fetching messages from Kafka cluster. Default value is 200 
milliseconds. </li>
+</ul>
+</li>
+</ul>
+
+<h4 id="Plugin Registration">Plugin Registration</h4>
+To register the kafka plugin, open the drill web interface. To open the drill 
web interface, enter <strong>http://drillbit:8047/storage</strong> in your 
browser.
+
+<p>The following is an example plugin registration configuration</p>
+<pre>
+{
+  "type": "kafka",
+  "kafkaConsumerProps": {
+    "key.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+    "auto.offset.reset": "earliest",
+    "bootstrap.servers": "localhost:9092",
+    "group.id": "drill-query-consumer-1",
+    "enable.auto.commit": "true",
+    "value.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+    "session.timeout.ms": "30000"
+  },
+  "enabled": true
+}
+</pre>
+
+<h4 id="Abstraction"> Abstraction </h4>
+<p>In Drill, each Kafka topic is mapped to a SQL table and when a query is 
issued on a table, it scans all the messages from the earliest offset to the 
latest offset of that topic at that point of time. This plugin automatically 
discovers all the topics (tables), to allow you perform analysis without 
executing DDL statements.
+
+<h4 id="Mapping">MetaData</h4>
+This plugin also fetches the additional information about each message. The 
following additional fields are supported as now
+<ul>
+       <li>kafkaTopic</li>
+       <li>kafkaPartitionId</li>
+       <li>kafkaMsgOffset</li>
+       <li>kafkaMsgTimestamp</li>
+       <li>kafkaMsgKey, unless it is not null</li>
+</ul>
+
+<h4 id="Examples"> Examples </h4>
+
+Kafka topics and message offsets
+
+```
+$bin/kafka-topics --list --zookeeper localhost:2181
+clicks
+clickstream
+clickstream-json-demo
+
+$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic 
clickstream-json-demo --from-beginning | more
+{"userID":"055e9af4-8c3c-4834-8482-8e05367a7bef","sessionID":"7badf08e-1e1d-4aeb-b853-7df2df4431ac","pageName":"shoes","refferalUrl":"yelp","ipAddress":"20.44.183.126","userAgent":"Mozilla/5.0
 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like 
Gecko) Version/10.0 Mobile/14E304 Safari/602.1","client_ts":1509926023099}
+{"userID":"a29454b3-642d-481e-9dd8-0e0d7ef32ef5","sessionID":"b4a89204-b98c-4b4b-a1a9-f28f22d5ead3","pageName":"books","refferalUrl":"yelp","ipAddress":"252.252.113.190","userAgent":"Mozilla/5.0
 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) 
Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41","client_ts":1509926023100}
+{"userID":"8c53b1c6-da47-4b5a-989d-61b5594f3a1d","sessionID":"baae3a1d-25b2-4955-8d07-20191f29ab32","pageName":"login","refferalUrl":"yelp","ipAddress":"110.170.214.255","userAgent":"Mozilla/5.0
 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 
Firefox/42.0","client_ts":1509926023100}
+
+$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 
--topic clickstream-json-demo --time -2
+clickstream-json-demo:2:2765000
+clickstream-json-demo:1:2765000
+clickstream-json-demo:3:2765000
+clickstream-json-demo:0:2765000
+
+$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 
--topic clickstream-json-demo --time -1
+clickstream-json-demo:2:2765245
+clickstream-json-demo:1:2765245
+clickstream-json-demo:3:2765245
+clickstream-json-demo:0:2765245
+
+
+```
+
+
+Drill queries on Kafka
+
+```
+$ bin/sqlline -u jdbc:drill:zk=localhost:2181
+Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; 
support was removed in 8.0
+apache drill 1.12.0-SNAPSHOT
+"json ain't no thang"
+0: jdbc:drill:zk=localhost:2181> use kafka;
++-------+------------------------------------+
+|  ok   |              summary               |
++-------+------------------------------------+
+| true  | Default schema changed to [kafka]  |
++-------+------------------------------------+
+1 row selected (0.564 seconds)
+0: jdbc:drill:zk=localhost:2181> show tables;
++---------------+------------------------------+
+| TABLE_SCHEMA  |          TABLE_NAME          |
++---------------+------------------------------+
+| kafka         | clickstream-json-demo        |
+| kafka         | clickstream                  |
+| kafka         | clicks                       |
++---------------+------------------------------+
+17 rows selected (1.908 seconds)
+0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.poll.timeout` 
= 200;
++-------+------------------------------------+
+|  ok   |              summary               |
++-------+------------------------------------+
+| true  | store.kafka.poll.timeout updated.  |
++-------+------------------------------------+
+1 row selected (0.102 seconds)
+0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.record.reader` 
= 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';
++-------+-------------------------------------+
+|  ok   |               summary               |
++-------+-------------------------------------+
+| true  | store.kafka.record.reader updated.  |
++-------+-------------------------------------+
+1 row selected (0.082 seconds)
+0: jdbc:drill:zk=localhost:2181> select * from kafka.`clickstream-json-demo` 
limit 2;
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+|                userID                 |               sessionID              
 |  pageName   | refferalUrl  |    ipAddress     |                              
       userAgent                                     |   client_ts    |       
kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  |
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+| 6b55a8fa-d0fd-41f0-94e3-7f6b551cdede  | e3bd34a8-b546-4cd5-a0c6-5438589839fc 
 | categories  | bing         | 198.105.119.221  | Mozilla/5.0 (Macintosh; 
Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0  | 1509926023098  | 
clickstream-json-demo  | 2                 | 2765000         | 1509926023098    
  |
+| 74cffc37-2df0-4db4-aff9-ed0027a12d03  | 339e3821-5254-4d79-bbae-69bc12808eca 
 | furniture   | bing         | 161.169.50.60    | Mozilla/5.0 (Windows NT 6.1; 
Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0     | 1509926023099  | 
clickstream-json-demo  | 2                 | 2765001         | 1509926023099    
  |
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+2 rows selected (1.18 seconds)
+0: jdbc:drill:zk=localhost:2181> select count(*) from 
kafka.`clickstream-json-demo`;
++---------+
+| EXPR$0  |
++---------+
+| 980     |
++---------+
+1 row selected (0.732 seconds)
+0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, MIN(kafkaMsgOffset) 
as minOffset, MAX(kafkaMsgOffset) as maxOffset from 
kafka.`clickstream-json-demo` group by kafkaPartitionId;
++-------------------+------------+------------+
+| kafkaPartitionId  | minOffset  | maxOffset  |
++-------------------+------------+------------+
+| 2                 | 2765000    | 2765244    |
+| 1                 | 2765000    | 2765244    |
+| 3                 | 2765000    | 2765244    |
+| 0                 | 2765000    | 2765244    |
++-------------------+------------+------------+
+4 rows selected (3.081 seconds)
+0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, 
from_unixtime(MIN(kafkaMsgTimestamp)/1000) as minKafkaTS, 
from_unixtime(MAX(kafkaMsgTimestamp)/1000) as maxKafkaTs from 
kafka.`clickstream-json-demo` group by kafkaPartitionId;
++-------------------+----------------------+----------------------+
+| kafkaPartitionId  |      minKafkaTS      |      maxKafkaTs      |
++-------------------+----------------------+----------------------+
+| 2                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 1                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 3                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 0                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
++-------------------+----------------------+----------------------+
+4 rows selected (2.758 seconds)
+0: jdbc:drill:zk=localhost:2181> select distinct(refferalUrl) from 
kafka.`clickstream-json-demo`;
++--------------+
+| refferalUrl  |
++--------------+
+| bing         |
+| yahoo        |
+| yelp         |
+| google       |
++--------------+
+4 rows selected (2.944 seconds)
+0: jdbc:drill:zk=localhost:2181> select pageName, count(*) from 
kafka.`clickstream-json-demo` group by pageName;
++--------------+---------+
+|   pageName   | EXPR$1  |
++--------------+---------+
+| categories   | 89      |
+| furniture    | 89      |
+| mobiles      | 89      |
+| clothing     | 89      |
+| sports       | 89      |
+| offers       | 89      |
+| shoes        | 89      |
+| books        | 89      |
+| login        | 90      |
+| electronics  | 89      |
+| toys         | 89      |
++--------------+---------+
+11 rows selected (2.493 seconds)
+
+```
+
+
+Note: 
+
+- store.kafka.record.reader system option can be used for setting record 
reader and default is 
org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+- Default store.kafka.poll.timeout is set to 200, user has to set this 
accordingly
+- Custom record reader can be implemented by extending 
org.apache.drill.exec.store.kafka.decoders.MessageReader and setting 
store.kafka.record.reader accordinlgy
+
+
+In case of JSON message format, following system options can be used 
accordingly. More details can be found in [Drill Json 
Model](https://drill.apache.org/docs/json-data-model/) and in [Drill system 
options 
configurations](https://drill.apache.org/docs/configuration-options-introduction/)
+
+<ui>
+  <li>ALTER SESSION SET `store.kafka.record.reader` = 
'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';</li>
+  <li>ALTER SESSION SET `store.kafka.poll.timeout` = 200;</li>
+  <li>ALTER SESSION SET `exec.enable_union_type` = true; </li>
+  <li>ALTER SESSION SET `store.kafka.all_text_mode` = true;</li>
+  <li>ALTER SESSION SET `store.kafka.read_numbers_as_double` = true;</li>
+</ui>
+
+<h4 id="RoadMap">RoadMap</h4>
+ <ul>
+   <li>AVRO Message format support</li>
+ </ul>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
new file mode 100644
index 0000000..1357a4e
--- /dev/null
+++ b/contrib/storage-kafka/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+  license agreements. See the NOTICE file distributed with this work for 
additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.12.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-kafka</artifactId>
+  <name>contrib/kafka-storage-plugin</name>
+
+  <properties>
+    <kafka.version>0.11.0.1</kafka.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencie -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>3.3.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
new file mode 100644
index 0000000..e08c7d7
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List<SchemaPath> columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List<PartitionScanWork> partitionWorkList;
+  private ListMultimap<Integer, PartitionScanWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+      @JsonProperty("columns") List<SchemaPath> columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec 
kafkaScanSpec, List<SchemaPath> columns) {
+    super(StringUtils.EMPTY);
+    this.kafkaStoragePlugin = kafkaStoragePlugin;
+    this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+    this.columns = columns;
+    this.kafkaScanSpec = kafkaScanSpec;
+    init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List<SchemaPath> columns,
+      KafkaScanSpec kafkaScanSpec, KafkaStoragePlugin pluginRegistry) {
+    super(userName);
+    this.kafkaStoragePluginConfig = kafkaStoragePluginConfig;
+    this.columns = columns;
+    this.kafkaScanSpec = kafkaScanSpec;
+    this.kafkaStoragePlugin = pluginRegistry;
+    init();
+  }
+
+  public KafkaGroupScan(KafkaGroupScan that) {
+    super(that);
+    this.kafkaStoragePluginConfig = that.kafkaStoragePluginConfig;
+    this.columns = that.columns;
+    this.kafkaScanSpec = that.kafkaScanSpec;
+    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+    this.partitionWorkList = that.partitionWorkList;
+    this.assignments = that.assignments;
+  }
+
+  private static class PartitionScanWork implements CompleteWork {
+
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+
+    private final TopicPartition topicPartition;
+    private final long beginOffset;
+    private final long latestOffset;
+
+    public PartitionScanWork(TopicPartition topicPartition, long beginOffset, 
long latestOffset) {
+      this.topicPartition = topicPartition;
+      this.beginOffset = beginOffset;
+      this.latestOffset = latestOffset;
+    }
+
+    public TopicPartition getTopicPartition() {
+      return topicPartition;
+    }
+
+    public long getBeginOffset() {
+      return beginOffset;
+    }
+
+    public long getLatestOffset() {
+      return latestOffset;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return Long.compare(getTotalBytes(), o.getTotalBytes());
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return (latestOffset - beginOffset) * MSG_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+  }
+
+  /**
+   * Computes work per topic partition, based on start and end offset of each
+   * corresponding topicPartition
+   */
+  private void init() {
+    partitionWorkList = Lists.newArrayList();
+    Collection<DrillbitEndpoint> endpoints = 
kafkaStoragePlugin.getContext().getBits();
+    Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
+    for (DrillbitEndpoint endpoint : endpoints) {
+      endpointMap.put(endpoint.getAddress(), endpoint);
+    }
+
+    Map<TopicPartition, Long> startOffsetsMap = Maps.newHashMap();
+    Map<TopicPartition, Long> endOffsetsMap = Maps.newHashMap();
+    List<PartitionInfo> topicPartitions = null;
+    String topicName = kafkaScanSpec.getTopicName();
+
+    try (KafkaConsumer<?, ?> kafkaConsumer = new 
KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+      if (!kafkaConsumer.listTopics().keySet().contains(topicName)) {
+        throw UserException.dataReadError()
+            .message("Table '%s' does not exist", topicName)
+            .build(logger);
+      }
+
+      kafkaConsumer.subscribe(Arrays.asList(topicName));
+      // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+      // evaluates lazily, seeking to the first/last offset in all partitions 
only
+      // when poll(long) or
+      // position(TopicPartition) are called
+      kafkaConsumer.poll(0);
+      Set<TopicPartition> assignments = kafkaConsumer.assignment();
+      topicPartitions = kafkaConsumer.partitionsFor(topicName);
+
+      // fetch start offsets for each topicPartition
+      kafkaConsumer.seekToBeginning(assignments);
+      for (TopicPartition topicPartition : assignments) {
+        startOffsetsMap.put(topicPartition, 
kafkaConsumer.position(topicPartition));
+      }
+
+      // fetch end offsets for each topicPartition
+      kafkaConsumer.seekToEnd(assignments);
+      for (TopicPartition topicPartition : assignments) {
+        endOffsetsMap.put(topicPartition, 
kafkaConsumer.position(topicPartition));
+      }
+    } catch (Exception e) {
+      throw UserException.dataReadError(e).message("Failed to fetch start/end 
offsets of the topic  %s", topicName)
+          .addContext(e.getMessage()).build(logger);
+    }
+
+    // computes work for each end point
+    for (PartitionInfo partitionInfo : topicPartitions) {
+      TopicPartition topicPartition = new TopicPartition(topicName, 
partitionInfo.partition());
+      long lastCommittedOffset = startOffsetsMap.get(topicPartition);
+      long latestOffset = endOffsetsMap.get(topicPartition);
+      logger.debug("Latest offset of {} is {}", topicPartition, latestOffset);
+      logger.debug("Last committed offset of {} is {}", topicPartition, 
lastCommittedOffset);
+      PartitionScanWork work = new PartitionScanWork(topicPartition, 
lastCommittedOffset, latestOffset);
+      Node[] inSyncReplicas = partitionInfo.inSyncReplicas();
+      for (Node isr : inSyncReplicas) {
+        String host = isr.host();
+        DrillbitEndpoint ep = endpointMap.get(host);
+        if (ep != null) {
+          work.getByteMap().add(ep, work.getTotalBytes());
+        }
+      }
+      partitionWorkList.add(work);
+    }
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, 
partitionWorkList);
+  }
+
+  @Override
+  public KafkaSubScan getSpecificScan(int minorFragmentId) {
+    List<PartitionScanWork> workList = assignments.get(minorFragmentId);
+    List<KafkaSubScanSpec> scanSpecList = Lists.newArrayList();
+
+    for (PartitionScanWork work : workList) {
+      scanSpecList.add(new KafkaSubScanSpec(work.getTopicPartition().topic(), 
work.getTopicPartition().partition(),
+          work.getBeginOffset(), work.getLatestOffset()));
+    }
+
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkaStoragePluginConfig, columns, scanSpecList);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return partitionWorkList.size();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    long messageCount = 0;
+    for (PartitionScanWork work : partitionWorkList) {
+      messageCount += (work.getLatestOffset() - work.getBeginOffset());
+    }
+    return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, messageCount, 1, 
messageCount * MSG_SIZE);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KafkaGroupScan(this);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(partitionWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    KafkaGroupScan clone = new KafkaGroupScan(this);
+    clone.columns = columns;
+    return clone;
+  }
+
+  @JsonProperty("kafkaStoragePluginConfig")
+  public KafkaStoragePluginConfig getStorageConfig() {
+    return this.kafkaStoragePluginConfig;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("kafkaScanSpec")
+  public KafkaScanSpec getScanSpec() {
+    return kafkaScanSpec;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getStoragePlugin() {
+    return kafkaStoragePlugin;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", 
kafkaScanSpec, columns);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
new file mode 100644
index 0000000..f034a8a
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
+      FragmentContext context, KafkaStoragePlugin plugin) {
+    setColumns(projectedColumns);
+    this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+    this.readNumbersAsDouble = context.getOptions()
+        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+    OptionManager options = context.getOptions();
+    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.kafkaMsgReader = 
options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+    this.kafkaPollTimeOut = 
options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+    this.plugin = plugin;
+    this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
projectedColumns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (!isStarQuery()) {
+      for (SchemaPath column : projectedColumns) {
+        transformed.add(column);
+      }
+    } else {
+      transformed.add(Utilities.STAR_COLUMN);
+    }
+    return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+    this.writer = new VectorContainerWriter(output, unionEnabled);
+    messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+    messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), this.writer,
+        this.enableAllTextMode, this.readNumbersAsDouble);
+    msgItr = new MessageIterator(messageReader.getConsumer(plugin), 
subScanSpec, kafkaPollTimeOut);
+  }
+
+  /**
+   * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
+   * take care of polling multiple times for this given batch next invocation
+   */
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+    Stopwatch watch = Stopwatch.createStarted();
+    int messageCount = 0;
+
+    try {
+      while (currentOffset < subScanSpec.getEndOffset() - 1 && 
msgItr.hasNext()) {
+        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+        currentOffset = consumerRecord.offset();
+        writer.setPosition(messageCount);
+        messageReader.readMessage(consumerRecord);
+        if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+          break;
+        }
+      }
+
+      messageReader.ensureAtLeastOneField();
+      writer.setValueCount(messageCount);
+      logger.debug("Took {} ms to process {} records.", 
watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+      logger.debug("Last offset consumed for {}:{} is {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+          currentOffset);
+      return messageCount;
+    } catch (Exception e) {
+      String msg = "Failure while reading messages from kafka. Recordreader 
was at record: " + (messageCount + 1);
+      throw 
UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    logger.info("Last offset processed for {}:{} is - {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+        currentOffset);
+    logger.info("Total time to fetch messages from {}:{} is - {} 
milliseconds", subScanSpec.getTopicName(),
+        subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
+    messageReader.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
new file mode 100644
index 0000000..026e341
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, KafkaSubScan 
subScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<SchemaPath> columns = subScan.getColumns() != null ? 
subScan.getColumns() : GroupScan.ALL_COLUMNS;
+
+    List<RecordReader> readers = 
Lists.newArrayListWithCapacity(subScan.getPartitionSubScanSpecList().size());
+    for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+      readers.add(new KafkaRecordReader(scanSpec, columns, context, 
subScan.getKafkaStoragePlugin()));
+    }
+
+    logger.info("Number of record readers initialized : {}", readers.size());
+    return new ScanBatch(subScan, context, readers);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
new file mode 100644
index 0000000..91c8fdf
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KafkaScanSpec {
+  private String topicName;
+
+  @JsonCreator
+  public KafkaScanSpec(@JsonProperty("topicName") String topicName) {
+    this.topicName = topicName;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaScanSpec [topicName=" + topicName + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
new file mode 100644
index 0000000..8986ea7
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext 
context, String name)
+      throws ExecutionSetupException {
+    logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+    this.config = config;
+    this.context = context;
+    this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+    return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+    this.kafkaSchemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> 
getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName,
+      JSONOptions selection) throws IOException {
+    KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
+        new TypeReference<KafkaScanSpec>() {
+        });
+    return new KafkaGroupScan(this, kafkaScanSpec, null);
+  }
+
+  public KafkaConsumer<byte[], byte[]> registerConsumer(KafkaConsumer<byte[], 
byte[]> consumer) {
+    return closer.register(consumer);
+  }
+
+  @Override
+  public void close() throws IOException {
+    closer.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
new file mode 100644
index 0000000..94afa5f
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KafkaStoragePluginConfig.NAME)
+public class KafkaStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
+  public static final String NAME = "kafka";
+  private Properties kafkaConsumerProps;
+
+  @JsonCreator
+  public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") 
Map<String, String> kafkaConsumerProps) {
+    this.kafkaConsumerProps = new Properties();
+    this.kafkaConsumerProps.putAll(kafkaConsumerProps);
+    logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps);
+  }
+
+  public Properties getKafkaConsumerProps() {
+    return kafkaConsumerProps;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((kafkaConsumerProps == null) ? 0 : 
kafkaConsumerProps.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
+    if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
+      return true;
+    }
+    if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
+      return false;
+    }
+    return kafkaConsumerProps.equals(other.kafkaConsumerProps);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
new file mode 100644
index 0000000..fc110b5
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig KafkaStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List<SchemaPath> columns;
+  private final List<KafkaSubScanSpec> partitions;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> 
partitions)
+      throws ExecutionSetupException {
+    super(userName);
+    this.KafkaStoragePluginConfig = kafkaStoragePluginConfig;
+    this.columns = columns;
+    this.partitions = partitions;
+    this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkaStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+      List<SchemaPath> columns, List<KafkaSubScanSpec> 
partitionSubScanSpecList) {
+    super(userName);
+    this.columns = columns;
+    this.KafkaStoragePluginConfig = kafkStoragePluginConfig;
+    this.kafkaStoragePlugin = plugin;
+    this.partitions = partitionSubScanSpecList;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
KafkaStoragePluginConfig, columns,
+        partitions);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
+    return KafkaStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+    return kafkaStoragePlugin;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
+    return partitions;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.KAFKA_SUB_SCAN_VALUE;
+  }
+
+  public static class KafkaSubScanSpec {
+    protected String topicName;
+    protected int partitionId;
+    protected long startOffset;
+    protected long endOffset;
+
+    @JsonCreator
+    public KafkaSubScanSpec(@JsonProperty("topicName") String topicName, 
@JsonProperty("partitionId") int partitionId,
+        @JsonProperty("startOffset") long startOffset, 
@JsonProperty("endOffset") long endOffset) {
+      this.topicName = topicName;
+      this.partitionId = partitionId;
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+
+    KafkaSubScanSpec() {
+
+    }
+
+    public String getTopicName() {
+      return topicName;
+    }
+
+    public int getPartitionId() {
+      return partitionId;
+    }
+
+    public long getStartOffset() {
+      return startOffset;
+    }
+
+    public long getEndOffset() {
+      return endOffset;
+    }
+
+    public KafkaSubScanSpec setTopicName(String topicName) {
+      this.topicName = topicName;
+      return this;
+    }
+
+    public KafkaSubScanSpec setPartitionId(int partitionId) {
+      this.partitionId = partitionId;
+      return this;
+    }
+
+    public KafkaSubScanSpec setStartOffset(long startOffset) {
+      this.startOffset = startOffset;
+      return this;
+    }
+
+    public KafkaSubScanSpec setEndOffset(long endOffset) {
+      this.endOffset = endOffset;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "KafkaSubScanSpec [topicName=" + topicName + ", partitionId=" + 
partitionId + ", startOffset="
+          + startOffset + ", endOffset=" + endOffset + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
new file mode 100644
index 0000000..3afb1b8
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+import kafka.common.KafkaException;
+
+public class MessageIterator implements Iterator<ConsumerRecord<byte[], 
byte[]>> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageIterator.class);
+  private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
+  private Iterator<ConsumerRecord<byte[], byte[]>> recordIter;
+  private final TopicPartition topicPartition;
+  private long totalFetchTime = 0;
+  private final long kafkaPollTimeOut;
+  private final long endOffset;
+
+  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, 
final KafkaSubScanSpec subScanSpec,
+      final long kafkaPollTimeOut) {
+    this.kafkaConsumer = kafkaConsumer;
+    this.kafkaPollTimeOut = kafkaPollTimeOut;
+
+    List<TopicPartition> partitions = Lists.newArrayListWithCapacity(1);
+    topicPartition = new TopicPartition(subScanSpec.getTopicName(), 
subScanSpec.getPartitionId());
+    partitions.add(topicPartition);
+    this.kafkaConsumer.assign(partitions);
+    logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), 
subScanSpec.getPartitionId(),
+        subScanSpec.getStartOffset());
+    this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset());
+    this.endOffset = subScanSpec.getEndOffset();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Does not support remove 
operation");
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (recordIter != null && recordIter.hasNext()) {
+      return true;
+    }
+
+    long nextPosition = kafkaConsumer.position(topicPartition);
+    if (nextPosition >= endOffset) {
+      return false;
+    }
+
+    ConsumerRecords<byte[], byte[]> consumerRecords = null;
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    try {
+      consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
+    } catch (KafkaException ke) {
+      logger.error(ke.getMessage(), ke);
+      throw 
UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
+    }
+    stopwatch.stop();
+
+    if (consumerRecords.isEmpty()) {
+      String errorMsg = new StringBuilder().append("Failed to fetch messages 
within ").append(kafkaPollTimeOut)
+          .append(" milliseconds. Consider increasing the value of the 
property : ")
+          .append(ExecConstants.KAFKA_POLL_TIMEOUT).toString();
+      throw UserException.dataReadError().message(errorMsg).build(logger);
+    }
+
+    long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    logger.debug("Total number of messages fetched : {}", 
consumerRecords.count());
+    logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
+    totalFetchTime += lastFetchTime;
+
+    recordIter = consumerRecords.iterator();
+    return recordIter.hasNext();
+  }
+
+  public long getTotalFetchTime() {
+    return this.totalFetchTime;
+  }
+
+  @Override
+  public ConsumerRecord<byte[], byte[]> next() {
+    return recordIter.next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
new file mode 100644
index 0000000..cdaee9b
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+/**
+ * MetaData fields provide additional information about each message.
+ * It is expected that one should not modify the fieldName of each constant as 
it breaks the compatibility.
+ */
+public enum MetaDataField {
+  KAFKA_TOPIC("kafkaTopic"), KAFKA_PARTITION_ID("kafkaPartitionId"), 
KAFKA_OFFSET("kafkaMsgOffset"), KAFKA_TIMESTAMP(
+      "kafkaMsgTimestamp"), KAFKA_MSG_KEY("kafkaMsgKey");
+
+  private final String fieldName;
+
+  private MetaDataField(final String fieldName) {
+    this.fieldName = fieldName;
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
new file mode 100644
index 0000000..9ad6107
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader class which will convert ConsumerRecord into JSON and writes 
to
+ * VectorContainerWriter of JsonReader
+ *
+ */
+public class JsonMessageReader implements MessageReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  @Override
+  public void init(DrillBuf buf, List<SchemaPath> columns, 
VectorContainerWriter writer, boolean allTextMode,
+      boolean readNumbersAsDouble) {
+    // set skipOuterList to false as it doesn't applicable for JSON records 
and it's only applicable for JSON files.
+    this.jsonReader = new JsonReader(buf, columns, allTextMode, false, 
readNumbersAsDouble);
+    this.writer = writer;
+  }
+
+  @Override
+  public void readMessage(ConsumerRecord<?, ?> record) {
+    try {
+      byte[] recordArray = (byte[]) record.value();
+      JsonObject jsonObj = (new JsonParser()).parse(new String(recordArray, 
Charsets.UTF_8)).getAsJsonObject();
+      jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
+      jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), 
record.partition());
+      jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
+      jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
+      jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? 
record.key().toString() : null);
+      jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
+      jsonReader.write(writer);
+    } catch (IOException e) {
+      throw UserException.dataReadError(e).message(e.getMessage())
+          .addContext("MessageReader", 
JsonMessageReader.class.getName()).build(logger);
+    }
+  }
+
+  @Override
+  public void ensureAtLeastOneField() {
+    jsonReader.ensureAtLeastOneField(writer);
+  }
+
+  @Override
+  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
+    return plugin.registerConsumer(new 
KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.writer.clear();
+    try {
+      this.writer.close();
+    } catch (Exception e) {
+      logger.warn("Error while closing JsonMessageReader", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
new file mode 100644
index 0000000..510a520
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader extends Closeable {
+
+  public void init(DrillBuf buf, List<SchemaPath> columns, 
VectorContainerWriter writer, boolean allTextMode,
+      boolean readNumbersAsDouble);
+
+  public void readMessage(ConsumerRecord<?, ?> message);
+
+  public void ensureAtLeastOneField();
+
+  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
new file mode 100644
index 0000000..cd83f96
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *          value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *           in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+    if (messageReaderKlass == null) {
+      throw UserException.validationError()
+          .message("Please configure message reader implementation using the 
property 'store.kafka.record.reader'")
+          .build(logger);
+    }
+
+    MessageReader messageReader = null;
+    try {
+      Class<?> klass = Class.forName(messageReaderKlass);
+      if (MessageReader.class.isAssignableFrom(klass)) {
+        messageReader = (MessageReader) klass.newInstance();
+        logger.info("Initialized Message Reader : {}", messageReader);
+      }
+    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+      throw UserException.validationError().message("Failed to initialize 
message reader : %s", messageReaderKlass)
+          .build(logger);
+    }
+
+    if (messageReader == null) {
+      throw UserException.validationError().message("Message reader configured 
'%s' does not implement '%s'",
+          messageReaderKlass, MessageReader.class.getName()).build(logger);
+    }
+    return messageReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
new file mode 100644
index 0000000..53c3493
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Kafka storage plugin.
+ *
+ * Enables querying Kafka as a data store, supported for Avro and Json message
+ * types.
+ */
+package org.apache.drill.exec.store.kafka;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
new file mode 100644
index 0000000..65b532d
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.schema;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.kafka.KafkaScanSpec;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+public class KafkaMessageSchema extends AbstractSchema {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaMessageSchema.class);
+  private final KafkaStoragePlugin plugin;
+  private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+  private Set<String> tableNames;
+
+  public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String 
name) {
+    super(ImmutableList.<String> of(), name);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public String getTypeName() {
+    return KafkaStoragePluginConfig.NAME;
+  }
+
+  void setHolder(SchemaPlus plusOfThis) {
+    for (String s : getSubSchemaNames()) {
+      plusOfThis.add(s, getSubSchema(s));
+    }
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+    if (!drillTables.containsKey(tableName)) {
+      KafkaScanSpec scanSpec = new KafkaScanSpec(tableName);
+      DrillTable table = new DynamicDrillTable(plugin, getName(), scanSpec);
+      drillTables.put(tableName, table);
+    }
+
+    return drillTables.get(tableName);
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    if (tableNames == null) {
+      try (KafkaConsumer<?, ?> kafkaConsumer = new 
KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
+        tableNames = kafkaConsumer.listTopics().keySet();
+      } catch(KafkaException e) {
+        throw UserException.dataReadError(e).message("Failed to get tables 
information").addContext(e.getMessage())
+            .build(logger);
+      }
+    }
+    return tableNames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
new file mode 100644
index 0000000..8f44a93
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.schema;
+
+import java.io.IOException;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+
+public class KafkaSchemaFactory implements SchemaFactory {
+
+  private final String schemaName;
+  private final KafkaStoragePlugin plugin;
+
+  public KafkaSchemaFactory(KafkaStoragePlugin plugin, String schemaName) {
+    this.plugin = plugin;
+    this.schemaName = schemaName;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
+      throws IOException {
+    KafkaMessageSchema schema = new KafkaMessageSchema(this.plugin, 
this.schemaName);
+    SchemaPlus hPlus = parent.add(schemaName, schema);
+    schema.setHolder(hPlus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json 
b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..406c030
--- /dev/null
+++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+  "storage":{
+    kafka : {
+      type:"kafka",
+      enabled: false,
+      kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : 
"drill-consumer"}
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/resources/drill-module.conf 
b/contrib/storage-kafka/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..4b813a4
--- /dev/null
+++ b/contrib/storage-kafka/src/main/resources/drill-module.conf
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.kafka"
+}
+drill.exec: {
+
+  sys.store.provider: {
+    kafka : {
+      "kafkaConsumerProps" : "{\"bootstrap.servers\" : \"localhost:9092\"}"
+    }
+  }
+
+}

Reply via email to