This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 287adec  DRILL-5977: Implement Filter Pushdown in Drill-Kafka plugin
287adec is described below

commit 287adec537f654e548404d1d337133fd21a1a937
Author: Abhishek Ravi <[email protected]>
AuthorDate: Sun Apr 8 14:01:31 2018 -0700

    DRILL-5977: Implement Filter Pushdown in Drill-Kafka plugin
    
    closes #1272
---
 .../drill/exec/store/kafka/KafkaGroupScan.java     |  90 +++---
 .../drill/exec/store/kafka/KafkaNodeProcessor.java | 186 +++++++++++
 .../exec/store/kafka/KafkaPartitionScanSpec.java   | 100 ++++++
 .../store/kafka/KafkaPartitionScanSpecBuilder.java | 345 ++++++++++++++++++++
 .../store/kafka/KafkaPushDownFilterIntoScan.java   |  81 +++++
 .../drill/exec/store/kafka/KafkaRecordReader.java  |   5 +-
 .../exec/store/kafka/KafkaScanBatchCreator.java    |   2 +-
 .../drill/exec/store/kafka/KafkaStoragePlugin.java |   2 +-
 .../drill/exec/store/kafka/KafkaSubScan.java       |  72 +----
 .../drill/exec/store/kafka/MessageIterator.java    |   3 +-
 .../exec/store/kafka/KafkaFilterPushdownTest.java  | 359 +++++++++++++++++++++
 .../exec/store/kafka/KafkaMessageGenerator.java    |  34 ++
 .../exec/store/kafka/MessageIteratorTest.java      |   5 +-
 .../drill/exec/store/kafka/TestKafkaSuit.java      |  18 +-
 .../drill/exec/store/kafka/TestQueryConstants.java |  12 +
 15 files changed, 1197 insertions(+), 117 deletions(-)

diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index 9cf575b..976c82a 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.store.kafka;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -35,7 +37,6 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -72,10 +73,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
   private final KafkaScanSpec kafkaScanSpec;
 
   private List<SchemaPath> columns;
-  private List<PartitionScanWork> partitionWorkList;
   private ListMultimap<Integer, PartitionScanWork> assignments;
   private List<EndpointAffinity> affinities;
 
+  private Map<TopicPartition, PartitionScanWork> partitionWorkMap;
+
   @JsonCreator
   public KafkaGroupScan(@JsonProperty("userName") String userName,
                         @JsonProperty("kafkaStoragePluginConfig") 
KafkaStoragePluginConfig kafkaStoragePluginConfig,
@@ -112,34 +114,18 @@ public class KafkaGroupScan extends AbstractGroupScan {
     this.kafkaStoragePlugin = that.kafkaStoragePlugin;
     this.columns = that.columns;
     this.kafkaScanSpec = that.kafkaScanSpec;
-    this.partitionWorkList = that.partitionWorkList;
     this.assignments = that.assignments;
+    this.partitionWorkMap = that.partitionWorkMap;
   }
 
-  private static class PartitionScanWork implements CompleteWork {
-
-    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
-
-    private final TopicPartition topicPartition;
-    private final long beginOffset;
-    private final long latestOffset;
-
-    public PartitionScanWork(TopicPartition topicPartition, long beginOffset, 
long latestOffset) {
-      this.topicPartition = topicPartition;
-      this.beginOffset = beginOffset;
-      this.latestOffset = latestOffset;
-    }
-
-    public TopicPartition getTopicPartition() {
-      return topicPartition;
-    }
+  public static class PartitionScanWork implements CompleteWork {
 
-    public long getBeginOffset() {
-      return beginOffset;
-    }
+    private final EndpointByteMapImpl byteMap;
+    private final KafkaPartitionScanSpec partitionScanSpec;
 
-    public long getLatestOffset() {
-      return latestOffset;
+    public PartitionScanWork(EndpointByteMap byteMap, KafkaPartitionScanSpec 
partitionScanSpec) {
+      this.byteMap = (EndpointByteMapImpl)byteMap;
+      this.partitionScanSpec = partitionScanSpec;
     }
 
     @Override
@@ -149,7 +135,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
 
     @Override
     public long getTotalBytes() {
-      return (latestOffset - beginOffset) * MSG_SIZE;
+      return (partitionScanSpec.getEndOffset() - 
partitionScanSpec.getStartOffset()) * MSG_SIZE;
     }
 
     @Override
@@ -157,6 +143,9 @@ public class KafkaGroupScan extends AbstractGroupScan {
       return byteMap;
     }
 
+    public KafkaPartitionScanSpec getPartitionScanSpec() {
+      return partitionScanSpec;
+    }
   }
 
   /**
@@ -164,7 +153,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
    * corresponding topicPartition
    */
   private void init() {
-    partitionWorkList = Lists.newArrayList();
+    partitionWorkMap = Maps.newHashMap();
     Collection<DrillbitEndpoint> endpoints = 
kafkaStoragePlugin.getContext().getBits();
     Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
     for (DrillbitEndpoint endpoint : endpoints) {
@@ -211,12 +200,13 @@ public class KafkaGroupScan extends AbstractGroupScan {
 
     // computes work for each end point
     for (PartitionInfo partitionInfo : topicPartitions) {
-      TopicPartition topicPartition = new TopicPartition(topicName, 
partitionInfo.partition());
+      TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
       long lastCommittedOffset = startOffsetsMap.get(topicPartition);
       long latestOffset = endOffsetsMap.get(topicPartition);
       logger.debug("Latest offset of {} is {}", topicPartition, latestOffset);
       logger.debug("Last committed offset of {} is {}", topicPartition, 
lastCommittedOffset);
-      PartitionScanWork work = new PartitionScanWork(topicPartition, 
lastCommittedOffset, latestOffset);
+      KafkaPartitionScanSpec partitionScanSpec = new 
KafkaPartitionScanSpec(topicPartition.topic(), topicPartition.partition(), 
lastCommittedOffset, latestOffset);
+      PartitionScanWork work = new PartitionScanWork(new 
EndpointByteMapImpl(), partitionScanSpec);
       Node[] inSyncReplicas = partitionInfo.inSyncReplicas();
       for (Node isr : inSyncReplicas) {
         String host = isr.host();
@@ -225,23 +215,22 @@ public class KafkaGroupScan extends AbstractGroupScan {
           work.getByteMap().add(ep, work.getTotalBytes());
         }
       }
-      partitionWorkList.add(work);
+      partitionWorkMap.put(topicPartition, work);
     }
   }
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    assignments = AssignmentCreator.getMappings(incomingEndpoints, 
partitionWorkList);
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, 
Lists.newArrayList(partitionWorkMap.values()));
   }
 
   @Override
   public KafkaSubScan getSpecificScan(int minorFragmentId) {
     List<PartitionScanWork> workList = assignments.get(minorFragmentId);
-    List<KafkaSubScanSpec> scanSpecList = Lists.newArrayList();
+    List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
 
     for (PartitionScanWork work : workList) {
-      scanSpecList.add(new KafkaSubScanSpec(work.getTopicPartition().topic(), 
work.getTopicPartition().partition(),
-          work.getBeginOffset(), work.getLatestOffset()));
+      scanSpecList.add(work.partitionScanSpec);
     }
 
     return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, 
scanSpecList);
@@ -249,14 +238,14 @@ public class KafkaGroupScan extends AbstractGroupScan {
 
   @Override
   public int getMaxParallelizationWidth() {
-    return partitionWorkList.size();
+    return partitionWorkMap.values().size();
   }
 
   @Override
   public ScanStats getScanStats() {
     long messageCount = 0;
-    for (PartitionScanWork work : partitionWorkList) {
-      messageCount += (work.getLatestOffset() - work.getBeginOffset());
+    for (PartitionScanWork work : partitionWorkMap.values()) {
+      messageCount += (work.getPartitionScanSpec().getEndOffset() - 
work.getPartitionScanSpec().getStartOffset());
     }
     return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, messageCount, 1, 
messageCount * MSG_SIZE);
   }
@@ -275,7 +264,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     if (affinities == null) {
-      affinities = AffinityCreator.getAffinityMap(partitionWorkList);
+      affinities = 
AffinityCreator.getAffinityMap(Lists.newArrayList(partitionWorkMap.values()));
     }
     return affinities;
   }
@@ -293,6 +282,23 @@ public class KafkaGroupScan extends AbstractGroupScan {
     return clone;
   }
 
+  public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec> 
partitionScanSpecList) {
+    KafkaGroupScan clone = new KafkaGroupScan(this);
+    HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet();
+
+    for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
+      TopicPartition tp = new TopicPartition(scanSpec.getTopicName(), 
scanSpec.getPartitionId());
+      partitionsInSpec.add(tp);
+
+      PartitionScanWork newScanWork = new 
PartitionScanWork(partitionWorkMap.get(tp).getByteMap(), scanSpec);
+      clone.partitionWorkMap.put(tp, newScanWork);
+    }
+
+    //Remove unnecessary partitions from partitionWorkMap
+    clone.partitionWorkMap.keySet().removeIf(tp -> 
!partitionsInSpec.contains(tp));
+    return clone;
+  }
+
   @JsonProperty
   public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
     return kafkaStoragePlugin.getConfig();
@@ -318,4 +324,12 @@ public class KafkaGroupScan extends AbstractGroupScan {
     return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", 
kafkaScanSpec, columns);
   }
 
+  @JsonIgnore
+  public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
+    List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList();
+    for (PartitionScanWork work : partitionWorkMap.values()) {
+      partitionScanSpecList.add(work.partitionScanSpec.clone());
+    }
+    return partitionScanSpecList;
+  }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
new file mode 100644
index 0000000..ba39b76
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class KafkaNodeProcessor extends AbstractExprVisitor<Boolean, 
LogicalExpression, RuntimeException> {
+
+  private String functionName;
+  private Boolean success;
+  private Long value;
+  private String path;
+
+  public KafkaNodeProcessor(String functionName) {
+    this.functionName = functionName;
+    this.success = false;
+  }
+
+  public static boolean isPushdownFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) 
throws RuntimeException {
+    return false;
+  }
+
+  public static KafkaNodeProcessor process(FunctionCall call) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args.get(0);
+    LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : 
null;
+    KafkaNodeProcessor evaluator = new KafkaNodeProcessor(functionName);
+
+    if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+      LogicalExpression swapArg = valueArg;
+      valueArg = nameArg;
+      nameArg = swapArg;
+      evaluator.functionName = 
COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+    }
+    evaluator.success = nameArg.accept(evaluator, valueArg);
+    return evaluator;
+  }
+
+  public boolean isSuccess() {
+    // TODO Auto-generated method stub
+    return success;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public Long getValue() {
+    return value;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) 
throws RuntimeException {
+    this.path = path.getRootSegmentPath();
+
+    if(valueArg == null) {
+      return false;
+    }
+
+    switch (this.path) {
+      case "kafkaMsgOffset":
+        /*
+         * Do not pushdown "not_equal" on kafkaMsgOffset.
+         */
+        if(functionName.equals("not_equal")) {
+          return false;
+        }
+      case "kafkaPartitionId":
+        if(valueArg instanceof IntExpression) {
+          value = (long) ((IntExpression) valueArg).getInt();
+          return true;
+        }
+
+        if(valueArg instanceof LongExpression) {
+          value = ((LongExpression) valueArg).getLong();
+          return true;
+        }
+        break;
+      case "kafkaMsgTimestamp":
+        /*
+        Only pushdown "equal", "greater_than", "greater_than_or_equal" on 
kafkaMsgTimestamp
+         */
+        if(!functionName.equals("equal") && 
!functionName.equals("greater_than")
+               && !functionName.equals("greater_than_or_equal_to")) {
+          return false;
+        }
+
+        if(valueArg instanceof LongExpression) {
+          value = ((LongExpression) valueArg).getLong();
+          return true;
+        }
+
+        if (valueArg instanceof DateExpression) {
+          value = ((DateExpression)valueArg).getDate();
+          return true;
+        }
+
+        if (valueArg instanceof TimeExpression) {
+          value = (long) ((TimeExpression)valueArg).getTime();
+          return true;
+        }
+
+        if (valueArg instanceof TimeStampExpression) {
+          value = ((TimeStampExpression) valueArg).getTimeStamp();
+          return true;
+        }
+
+        if(valueArg instanceof IntExpression) {
+          value = (long) ((IntExpression) valueArg).getInt();
+          return true;
+        }
+        break;
+    }
+    return false;
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> 
VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = 
ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder
+                                   .add(BooleanExpression.class)
+                                   .add(DateExpression.class)
+                                   .add(DoubleExpression.class)
+                                   .add(FloatExpression.class)
+                                   .add(IntExpression.class)
+                                   .add(LongExpression.class)
+                                   .add(QuotedString.class)
+                                   .add(TimeExpression.class)
+                                   .build();
+  }
+
+  private static final ImmutableMap<String, String> 
COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+                                          .put("equal", "equal")
+                                          .put("not_equal", "not_equal")
+                                          .put("greater_than_or_equal_to", 
"less_than_or_equal_to")
+                                          .put("greater_than", "less_than")
+                                          .put("less_than_or_equal_to", 
"greater_than_or_equal_to")
+                                          .put("less_than", "greater_than")
+                                          .build();
+  }
+
+}
+
+
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
new file mode 100644
index 0000000..713f62e
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KafkaPartitionScanSpec {
+  private String topicName;
+  private int partitionId;
+  private long startOffset;
+  private long endOffset;
+
+  @JsonCreator
+  public KafkaPartitionScanSpec(@JsonProperty("topicName") String topicName,
+                                @JsonProperty("partitionId") int partitionId,
+                                @JsonProperty("startOffset") long startOffset,
+                                @JsonProperty("endOffset") long endOffset) {
+    this.topicName = topicName;
+    this.partitionId = partitionId;
+    this.startOffset = startOffset;
+    this.endOffset = endOffset;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getEndOffset() {
+    return endOffset;
+  }
+
+  public void mergeScanSpec(String functionName, KafkaPartitionScanSpec 
scanSpec) {
+    switch (functionName) {
+      case "booleanAnd":
+        //Reduce the scan range
+        if(startOffset < scanSpec.startOffset) {
+          startOffset = scanSpec.startOffset;
+        }
+
+        if(endOffset > scanSpec.endOffset) {
+          endOffset = scanSpec.endOffset;
+        }
+        break;
+      case "booleanOr":
+        //Increase the scan range
+        if(scanSpec.startOffset < startOffset) {
+          startOffset = scanSpec.startOffset;
+        }
+
+        if(scanSpec.endOffset > endOffset) {
+          endOffset = scanSpec.endOffset;
+        }
+        break;
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof KafkaPartitionScanSpec) {
+      KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj);
+      return this.topicName.equals(that.topicName) && this.partitionId == 
that.partitionId
+                 && this.startOffset == that.startOffset && this.endOffset == 
that.endOffset;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaPartitionScanSpec [topicName=" + topicName + ", partitionId=" 
+ partitionId + ", startOffset="
+               + startOffset + ", endOffset=" + endOffset + "]";
+  }
+
+  public KafkaPartitionScanSpec clone() {
+    return new KafkaPartitionScanSpec(topicName, partitionId, startOffset, 
endOffset);
+  }
+}
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
new file mode 100644
index 0000000..b52ed44
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaPartitionScanSpecBuilder extends
+    AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class);
+  private final LogicalExpression le;
+  private final KafkaGroupScan groupScan;
+  private final KafkaConsumer<? ,?> kafkaConsumer;
+  private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec;
+  private static final long CLOSE_TIMEOUT_MS = 200;
+
+  public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan, 
LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    kafkaConsumer = new 
KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    le = conditionExp;
+  }
+
+  public List<KafkaPartitionScanSpec> parseTree() {
+    ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder = 
ImmutableMap.builder();
+    for(KafkaPartitionScanSpec scanSpec : 
groupScan.getPartitionScanSpecList()) {
+      builder.put(new TopicPartition(scanSpec.getTopicName(), 
scanSpec.getPartitionId()), scanSpec);
+    }
+    fullScanSpec = builder.build();
+    List<KafkaPartitionScanSpec> pushdownSpec = le.accept(this, null);
+
+    /*
+    Non-existing / invalid partitions may result in empty scan spec.
+    This results in a "ScanBatch" with no reader. DRILL currently requires
+    at least one reader to be present in a scan batch.
+     */
+    if(pushdownSpec != null && pushdownSpec.isEmpty()) {
+      TopicPartition firstPartition = new 
TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0);
+      KafkaPartitionScanSpec emptySpec =
+          new 
KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(),
+              fullScanSpec.get(firstPartition).getEndOffset(), 
fullScanSpec.get(firstPartition).getEndOffset());
+      pushdownSpec.add(emptySpec);
+    }
+    return pushdownSpec;
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void 
value)
+      throws RuntimeException {
+    return null;
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, 
Void value)
+      throws RuntimeException {
+
+    Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap();
+    ImmutableList<LogicalExpression> args = op.args;
+    if(op.getName().equals("booleanOr")) {
+
+      for(LogicalExpression expr : args) {
+        List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+        //parsedSpec is null if expression cannot be pushed down
+        if(parsedSpec != null) {
+          for(KafkaPartitionScanSpec newSpec : parsedSpec) {
+            TopicPartition tp = new TopicPartition(newSpec.getTopicName(), 
newSpec.getPartitionId());
+            KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+            //If existing spec does not contain topic-partition
+            if(existingSpec == null) {
+              specMap.put(tp, newSpec); //Add topic-partition to spec for OR
+            } else {
+              existingSpec.mergeScanSpec(op.getName(), newSpec);
+              specMap.put(tp, existingSpec);
+            }
+          }
+        } else {
+          return null; //At any level, all arguments of booleanOr should 
support pushdown, else return null
+        }
+      }
+    } else { //booleanAnd
+      specMap.putAll(fullScanSpec);
+      for(LogicalExpression expr : args) {
+        List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+
+        //parsedSpec is null if expression cannot be pushed down
+        if(parsedSpec != null) {
+          Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store 
topic-partitions returned from new spec.
+
+          for (KafkaPartitionScanSpec newSpec : parsedSpec) {
+            TopicPartition tp = new TopicPartition(newSpec.getTopicName(), 
newSpec.getPartitionId());
+            partitionsInNewSpec.add(tp);
+            KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+            if (existingSpec != null) {
+              existingSpec.mergeScanSpec(op.getName(), newSpec);
+              specMap.put(tp, existingSpec);
+            }
+          }
+
+          /*
+          For "booleanAnd", handle the case where condition is on 
`kafkaPartitionId`.
+          In this case, we would not want unnecessarily scan all the 
topic-partitions.
+          Hence we remove the unnecessary topic-partitions from the spec.
+         */
+          specMap.keySet().removeIf(partition -> 
!partitionsInNewSpec.contains(partition));
+        }
+
+      }
+    }
+    return Lists.newArrayList(specMap.values());
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, 
Void value)
+      throws RuntimeException {
+
+    String functionName = call.getName();
+    if(KafkaNodeProcessor.isPushdownFunction(functionName)) {
+
+      KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call);
+      if(kafkaNodeProcessor.isSuccess()) {
+        switch (kafkaNodeProcessor.getPath()) {
+          case "kafkaMsgTimestamp":
+            return 
createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+          case "kafkaMsgOffset":
+            return 
createScanSpecForOffset(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+          case "kafkaPartitionId":
+            return 
createScanSpecForPartition(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+        }
+      }
+    }
+    return null; //Return null, do not pushdown
+  }
+
+
+  private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String 
functionName,
+                                                                  Long 
fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+    Map<TopicPartition, Long> timesValMap = Maps.newHashMap();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    for(TopicPartition partitions : topicPartitions) {
+      timesValMap.put(partitions, functionName.equals("greater_than") ? 
fieldValue+1 : fieldValue);
+    }
+
+    Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = 
kafkaConsumer.offsetsForTimes(timesValMap);
+
+    for(TopicPartition tp : topicPartitions) {
+      OffsetAndTimestamp value = offsetAndTimestamp.get(tp);
+      //OffsetAndTimestamp is null if there is no offset greater or equal to 
requested timestamp
+      if(value == null) {
+        scanSpec.add(
+            new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                fullScanSpec.get(tp).getEndOffset(), 
fullScanSpec.get(tp).getEndOffset()));
+      } else {
+        scanSpec.add(
+            new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                value.offset(), fullScanSpec.get(tp).getEndOffset()));
+      }
+    }
+
+    return scanSpec;
+  }
+
+  private List<KafkaPartitionScanSpec> createScanSpecForOffset(String 
functionName,
+                                                               Long 
fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    /*
+    We should handle the case where the specified offset does not exist in the 
current context,
+    i.e., fieldValue < startOffset or fieldValue > endOffset in a particular 
topic-partition.
+    Else, KafkaConsumer.poll will throw "TimeoutException".
+    */
+
+    switch (functionName) {
+      case "equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(fieldValue < fullScanSpec.get(tp).getStartOffset()) {
+            //Offset does not exist
+            scanSpec.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getEndOffset(), 
fullScanSpec.get(tp).getEndOffset()));
+          } else {
+            long val = Math.min(fieldValue, 
fullScanSpec.get(tp).getEndOffset());
+            long nextVal = Math.min(val+1, 
fullScanSpec.get(tp).getEndOffset());
+            scanSpec.add(new KafkaPartitionScanSpec(tp.topic(), 
tp.partition(), val, nextVal));
+          }
+        }
+        break;
+      case "greater_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue);
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(), val,
+                  fullScanSpec.get(tp).getEndOffset()));
+        }
+        break;
+      case "greater_than":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue+1);
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  val, fullScanSpec.get(tp).getEndOffset()));
+        }
+        break;
+      case "less_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue+1);
+
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  fullScanSpec.get(tp).getStartOffset(), val));
+        }
+        break;
+      case "less_than":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue);
+
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  fullScanSpec.get(tp).getStartOffset(), val));
+        }
+        break;
+    }
+    return scanSpec;
+  }
+
+  private List<KafkaPartitionScanSpec> createScanSpecForPartition(String 
functionName,
+                                                                  Long 
fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    switch (functionName) {
+      case "equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() == fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "not_equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() != fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "greater_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() >= fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "greater_than":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() > fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "less_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() <= fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "less_than":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() < fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+    }
+    return scanSpecList;
+  }
+
+  void close() {
+    kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+  }
+
+  private long bindOffsetToRange(TopicPartition tp, long offset) {
+    return Math.max(fullScanSpec.get(tp).getStartOffset(), Math.min(offset, 
fullScanSpec.get(tp).getEndOffset()));
+  }
+}
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
new file mode 100644
index 0000000..bf11f85
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import java.util.List;
+
+public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
+
+  public static final StoragePluginOptimizerRule INSTANCE =
+      new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class, 
RelOptHelper.any(ScanPrel.class)),
+          "KafkaPushFilterIntoScan:Filter_On_Scan");
+
+  private KafkaPushDownFilterIntoScan(RelOptRuleOperand operand, String 
description) {
+    super(operand, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final ScanPrel scan = call.rel(1);
+    final FilterPrel filter = call.rel(0);
+    final RexNode condition = filter.getCondition();
+
+    LogicalExpression conditionExp =
+        DrillOptiq.toDrill(new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, 
condition);
+
+    KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan();
+    logger.info("Partitions ScanSpec before pushdown: " + 
groupScan.getPartitionScanSpecList());
+    KafkaPartitionScanSpecBuilder builder = new 
KafkaPartitionScanSpecBuilder(groupScan, conditionExp);
+    List<KafkaPartitionScanSpec> newScanSpec = null;
+    newScanSpec = builder.parseTree();
+    builder.close(); //Close consumer
+
+    //No pushdown
+    if(newScanSpec == null) {
+      return;
+    }
+
+    logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
+    GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), 
newGroupScan, scan.getRowType());
+    call.transformTo(filter.copy(filter.getTraitSet(), 
ImmutableList.of(newScanPrel)));
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final ScanPrel scan = (ScanPrel) call.rel(1);
+    if (scan.getGroupScan() instanceof KafkaGroupScan) {
+      return super.matches(call);
+    }
+    return false;
+  }
+}
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index b1cf9cd..a0fc1f1 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -52,7 +51,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
 
   private final boolean unionEnabled;
   private final KafkaStoragePlugin plugin;
-  private final KafkaSubScanSpec subScanSpec;
+  private final KafkaPartitionScanSpec subScanSpec;
   private final long kafkaPollTimeOut;
 
   private long currentOffset;
@@ -62,7 +61,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
   private final boolean readNumbersAsDouble;
   private final String kafkaMsgReader;
 
-  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
     setColumns(projectedColumns);
     final OptionManager optionManager = context.getOptions();
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 9bedbd5..ae78d8c 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -44,7 +44,7 @@ public class KafkaScanBatchCreator implements 
BatchCreator<KafkaSubScan> {
     List<SchemaPath> columns = subScan.getColumns() != null ? 
subScan.getColumns() : GroupScan.ALL_COLUMNS;
 
     List<RecordReader> readers = new LinkedList<>();
-    for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+    for (KafkaPartitionScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
       readers.add(new KafkaRecordReader(scanSpec, columns, context, 
subScan.getKafkaStoragePlugin()));
     }
 
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 157e367..4ca91ec 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -71,7 +71,7 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin 
{
 
   @Override
   public Set<StoragePluginOptimizerRule> 
getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of();
+    return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
   }
 
   @Override
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 468f766..d62faa6 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -43,14 +43,14 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
 
   private final KafkaStoragePlugin kafkaStoragePlugin;
   private final List<SchemaPath> columns;
-  private final List<KafkaSubScanSpec> partitionSubScanSpecList;
+  private final List<KafkaPartitionScanSpec> partitionSubScanSpecList;
 
   @JsonCreator
   public KafkaSubScan(@JacksonInject StoragePluginRegistry registry,
                       @JsonProperty("userName") String userName,
                       @JsonProperty("kafkaStoragePluginConfig") 
KafkaStoragePluginConfig kafkaStoragePluginConfig,
                       @JsonProperty("columns") List<SchemaPath> columns,
-                      @JsonProperty("partitionSubScanSpecList") 
LinkedList<KafkaSubScanSpec> partitionSubScanSpecList)
+                      @JsonProperty("partitionSubScanSpecList") 
LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
       throws ExecutionSetupException {
     this(userName,
         (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
@@ -61,7 +61,7 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
   public KafkaSubScan(String userName,
                       KafkaStoragePlugin kafkaStoragePlugin,
                       List<SchemaPath> columns,
-                      List<KafkaSubScanSpec> partitionSubScanSpecList) {
+                      List<KafkaPartitionScanSpec> partitionSubScanSpecList) {
     super(userName);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
@@ -95,7 +95,7 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
   }
 
   @JsonProperty
-  public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
+  public List<KafkaPartitionScanSpec> getPartitionSubScanSpecList() {
     return partitionSubScanSpecList;
   }
 
@@ -108,68 +108,4 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
   public int getOperatorType() {
     return CoreOperatorType.KAFKA_SUB_SCAN_VALUE;
   }
-
-  public static class KafkaSubScanSpec {
-    protected String topicName;
-    protected int partitionId;
-    protected long startOffset;
-    protected long endOffset;
-
-    @JsonCreator
-    public KafkaSubScanSpec(@JsonProperty("topicName") String topicName, 
@JsonProperty("partitionId") int partitionId,
-        @JsonProperty("startOffset") long startOffset, 
@JsonProperty("endOffset") long endOffset) {
-      this.topicName = topicName;
-      this.partitionId = partitionId;
-      this.startOffset = startOffset;
-      this.endOffset = endOffset;
-    }
-
-    KafkaSubScanSpec() {
-
-    }
-
-    public String getTopicName() {
-      return topicName;
-    }
-
-    public int getPartitionId() {
-      return partitionId;
-    }
-
-    public long getStartOffset() {
-      return startOffset;
-    }
-
-    public long getEndOffset() {
-      return endOffset;
-    }
-
-    public KafkaSubScanSpec setTopicName(String topicName) {
-      this.topicName = topicName;
-      return this;
-    }
-
-    public KafkaSubScanSpec setPartitionId(int partitionId) {
-      this.partitionId = partitionId;
-      return this;
-    }
-
-    public KafkaSubScanSpec setStartOffset(long startOffset) {
-      this.startOffset = startOffset;
-      return this;
-    }
-
-    public KafkaSubScanSpec setEndOffset(long endOffset) {
-      this.endOffset = endOffset;
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return "KafkaSubScanSpec [topicName=" + topicName + ", partitionId=" + 
partitionId + ", startOffset="
-          + startOffset + ", endOffset=" + endOffset + "]";
-    }
-
-  }
-
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 3afb1b8..1c814f6 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,7 +45,7 @@ public class MessageIterator implements 
Iterator<ConsumerRecord<byte[], byte[]>>
   private final long kafkaPollTimeOut;
   private final long endOffset;
 
-  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, 
final KafkaSubScanSpec subScanSpec,
+  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, 
final KafkaPartitionScanSpec subScanSpec,
       final long kafkaPollTimeOut) {
     this.kafkaConsumer = kafkaConsumer;
     this.kafkaPollTimeOut = kafkaPollTimeOut;
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
new file mode 100644
index 0000000..7be0ec3
--- /dev/null
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.categories.KafkaStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
+import static 
org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+
+@Category({KafkaStorageTest.class, SlowTest.class})
+public class KafkaFilterPushdownTest extends KafkaTestBase {
+  private static final int NUM_PARTITIONS = 5;
+  private static final String expectedSubStr = "    \"kafkaScanSpec\" : {\n" +
+                                                   "      \"topicName\" : 
\"drill-pushdown-topic\"\n" +
+                                                   "    },\n" +
+                                                   "    \"cost\" : %s.0";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestKafkaSuit.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC, 
NUM_PARTITIONS);
+    KafkaMessageGenerator generator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
+        StringSerializer.class);
+    
generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, 
NUM_JSON_MSG);
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaMsgOffset.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnOffset() throws Exception {
+    final String predicate1 = "kafkaMsgOffset > 4";
+    final String predicate2 = "kafkaMsgOffset < 6";
+    final int expectedRowCount = 5; //1 * NUM_PARTITIONS
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaPartitionId.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnPartition() throws Exception {
+    final String predicate = "kafkaPartitionId = 1";
+    final int expectedRowCount = NUM_JSON_MSG;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaPartitionId.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnTimestamp() throws Exception {
+    final String predicate = "kafkaMsgTimestamp > 6";
+    final int expectedRowCount = 20;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when timestamp is not ordered.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownUnorderedTimestamp() throws Exception {
+    final String predicate = "kafkaMsgTimestamp = 1";
+    final int expectedRowInPlan = 50;
+    final int expectedRowCount = 5;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowInPlan));
+  }
+
+  /**
+   * Test filter pushdown when timestamp value specified does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWhenTimestampDoesNotExist() throws Exception {
+    final String predicate = "kafkaMsgTimestamp = 20"; //20 does not exist
+    final int expectedRowCount = 0;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when partition value specified does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWhenPartitionDoesNotExist() throws Exception {
+    final String predicate = "kafkaPartitionId = 100"; //100 does not exist
+    final int expectedRowCount = 0;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when timestamp exist but partition does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownForEmptyScanSpec() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 100";
+    final int expectedRowCount = 0;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+   * In every case, the number of records returned is 0.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() 
throws Exception {
+    final int expectedRowCount = 0;
+
+    //"equal" such that value = endOffset
+    String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"equal" such that value < startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"greater_than" such that value = endOffset-1
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"greater_than_or_equal" such that value = endOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"less_than" such that value = startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"less_than_or_equal" such that value < startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+   * In every case, the number of records returned is 5 (1 per 
topic-partition).
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() 
throws Exception {
+    final int expectedRowCount = 5;
+
+    //"equal" such that value = endOffset-1
+    String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"greater_than" such that value = endOffset-2
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+
+    //"greater_than_or_equal" such that value = endOffset-1
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with OR.
+   * Pushdown is possible if all the predicates are on metadata fields.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithOr() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 1";
+    final int expectedRowCount = 26;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithOr1() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp = 6";
+    final String predicate2 = "kafkaMsgOffset = 6";
+    final int expectedRowInPlan = 25; //startOff=5, endOff=9
+    final int expectedRowCount = 10;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowInPlan));
+  }
+
+  /**
+   * Test pushdown for a combination of AND and OR.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithAndOrCombo() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 1";
+    final String predicate3 = "kafkaPartitionId = 2";
+    final int expectedRowCount = 8;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, 
predicate3);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCount));
+  }
+
+  /**
+   * Test pushdown for a combination of AND and OR.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithAndOrCombo2() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp = 6";
+    final String predicate2 = "kafkaMsgOffset = 6";
+    final String predicate3 = "kafkaPartitionId = 1";
+    final String predicate4 = "kafkaPartitionId = 2";
+    final int expectedRowCountInPlan = 10; //startOff=5, endOff=9 for 2 
partitions
+    final int expectedRowCount = 4;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, 
predicate3, predicate4);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCountInPlan));
+  }
+
+  /**
+   * Test pushdown for predicate1 AND predicate2.
+   * Where predicate1 is on metadata field and and predicate2 is on user 
fields.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownTimestampWithNonMetaField() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "boolKey = true";
+    final int expectedRowCountInPlan = 20; //startOff=5, endOff=9
+    final int expectedRowCount = 10;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCountInPlan));
+  }
+
+  /**
+   * Tests that pushdown does not happen for predicates such as
+   * non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND 
kafkaMsgTimestamp < val4)
+   * @throws Exception
+   */
+  @Test
+  public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
+    final String predicate1 = "boolKey = true";
+    final String predicate2 = "kafkaMsgTimestamp > 6";
+    final String predicate3 = "kafkaMsgTimestamp < 9";
+    final int expectedRowCountInPlan = 50; //no pushdown
+    final int expectedRowCount = 30;
+
+    final String queryString = 
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, 
predicate3);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, 
expectedRowCountInPlan));
+  }
+
+}
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 1931898..32d9ff1 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,4 +132,37 @@ public class KafkaMessageGenerator {
     }
   }
 
+  public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
+    KafkaProducer<String, String> producer = null;
+    Random rand = new Random();
+    try {
+      producer = new KafkaProducer<String, String>(producerProperties);
+      int halfCount = numMsg / 2;
+
+      for(PartitionInfo tpInfo : producer.partitionsFor(topic)) {
+        for (int i = 1; i <= numMsg; ++i) {
+          JsonObject object = new JsonObject();
+          object.addProperty("stringKey", UUID.randomUUID().toString());
+          object.addProperty("intKey", numMsg - i);
+          object.addProperty("boolKey", i % 2 == 0);
+
+          long timestamp = i < halfCount ? (halfCount - i) : i;
+          ProducerRecord<String, String> message =
+              new ProducerRecord<String, String>(tpInfo.topic(), 
tpInfo.partition(), timestamp, "key"+i, object.toString());
+          logger.info("Publishing message : {}", message);
+          Future<RecordMetadata> future = producer.send(message);
+          logger.info("Committed offset of the message : {}", 
future.get().offset());
+        }
+
+      }
+    } catch (Throwable th) {
+      logger.error(th.getMessage(), th);
+      throw new DrillRuntimeException(th.getMessage(), th);
+    } finally {
+      if (producer != null) {
+        producer.close();
+      }
+    }
+  }
+
 }
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 4a15596..4347167 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -25,7 +25,6 @@ import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -40,7 +39,7 @@ import org.junit.experimental.categories.Category;
 public class MessageIteratorTest extends KafkaTestBase {
 
   private KafkaConsumer<byte[], byte[]> kafkaConsumer;
-  private KafkaSubScanSpec subScanSpec;
+  private KafkaPartitionScanSpec subScanSpec;
 
   @Before
   public void setUp() {
@@ -49,7 +48,7 @@ public class MessageIteratorTest extends KafkaTestBase {
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
     consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
     kafkaConsumer = new KafkaConsumer<>(consumerProps);
-    subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+    subScanSpec = new KafkaPartitionScanSpec(TestQueryConstants.JSON_TOPIC, 0, 
0, TestKafkaSuit.NUM_JSON_MSG);
   }
 
   @After
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index ed01747..ecf998e 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -27,6 +27,7 @@ import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ZookeeperTestUtil;
 import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.security.JaasUtils;
 
@@ -46,7 +47,8 @@ import kafka.utils.ZkUtils;
 
 @Category({KafkaStorageTest.class, SlowTest.class})
 @RunWith(Suite.class)
-@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, 
MessageReaderFactoryTest.class })
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, 
MessageReaderFactoryTest.class,
+    KafkaFilterPushdownTest.class })
 public class TestKafkaSuit {
   private static final Logger logger = 
LoggerFactory.getLogger(LoggerFactory.class);
   private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
@@ -106,4 +108,18 @@ public class TestKafkaSuit {
     }
   }
 
+  public static void createTopicHelper(final String topicName, final int 
partitions) {
+
+    Properties topicProps = new Properties();
+    topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+    ZkUtils zkUtils = new ZkUtils(zkClient,
+        new 
ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
+    AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
+        topicProps, RackAwareMode.Disabled$.MODULE$);
+
+    org.apache.kafka.common.requests.MetadataResponse.TopicMetadata 
fetchTopicMetadataFromZk =
+        AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
+    logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+  }
+
 }
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
index 057af7e..b3163ad 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -29,12 +29,24 @@ public interface TestQueryConstants {
   int MAX_CLIENT_CONNECTIONS = 100;
 
   String JSON_TOPIC = "drill-json-topic";
+  String JSON_PUSHDOWN_TOPIC = "drill-pushdown-topic";
   String AVRO_TOPIC = "drill-avro-topic";
   String INVALID_TOPIC = "invalid-topic";
 
+  String KAFKA_MSG_TIMESTAMP_FIELD = "kafkaMsgTimestamp";
+  String KAFKA_PARTITION_ID_FIELD = "kafkaPartitionId";
+  String KAFKA_MSG_OFFSET_FIELD = "kafkaMsgOffset";
+
   // Queries
   String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
   String MSG_SELECT_QUERY = "select * from kafka.`%s`";
   String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from 
kafka.`%s`";
   String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from 
kafka.`%s`";
+
+  String QUERY_TEMPLATE_BASIC = "select * from kafka.`%s` where %s";
+  String QUERY_TEMPLATE_AND = "select * from kafka.`%s` where %s AND %s";
+  String QUERY_TEMPLATE_OR = "select * from kafka.`%s` where %s OR %s";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_1 = "select * from kafka.`%s` where %s 
AND (%s OR %s)";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_2 = "select * from kafka.`%s` where %s 
OR (%s AND %s)";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_3 = "select * from kafka.`%s` where (%s 
OR %s) AND (%s OR %s)";
 }

Reply via email to