This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 287adec DRILL-5977: Implement Filter Pushdown in Drill-Kafka plugin
287adec is described below
commit 287adec537f654e548404d1d337133fd21a1a937
Author: Abhishek Ravi <[email protected]>
AuthorDate: Sun Apr 8 14:01:31 2018 -0700
DRILL-5977: Implement Filter Pushdown in Drill-Kafka plugin
closes #1272
---
.../drill/exec/store/kafka/KafkaGroupScan.java | 90 +++---
.../drill/exec/store/kafka/KafkaNodeProcessor.java | 186 +++++++++++
.../exec/store/kafka/KafkaPartitionScanSpec.java | 100 ++++++
.../store/kafka/KafkaPartitionScanSpecBuilder.java | 345 ++++++++++++++++++++
.../store/kafka/KafkaPushDownFilterIntoScan.java | 81 +++++
.../drill/exec/store/kafka/KafkaRecordReader.java | 5 +-
.../exec/store/kafka/KafkaScanBatchCreator.java | 2 +-
.../drill/exec/store/kafka/KafkaStoragePlugin.java | 2 +-
.../drill/exec/store/kafka/KafkaSubScan.java | 72 +----
.../drill/exec/store/kafka/MessageIterator.java | 3 +-
.../exec/store/kafka/KafkaFilterPushdownTest.java | 359 +++++++++++++++++++++
.../exec/store/kafka/KafkaMessageGenerator.java | 34 ++
.../exec/store/kafka/MessageIteratorTest.java | 5 +-
.../drill/exec/store/kafka/TestKafkaSuit.java | 18 +-
.../drill/exec/store/kafka/TestQueryConstants.java | 12 +
15 files changed, 1197 insertions(+), 117 deletions(-)
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index 9cf575b..976c82a 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.store.kafka;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
@@ -35,7 +37,6 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -72,10 +73,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
private final KafkaScanSpec kafkaScanSpec;
private List<SchemaPath> columns;
- private List<PartitionScanWork> partitionWorkList;
private ListMultimap<Integer, PartitionScanWork> assignments;
private List<EndpointAffinity> affinities;
+ private Map<TopicPartition, PartitionScanWork> partitionWorkMap;
+
@JsonCreator
public KafkaGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("kafkaStoragePluginConfig")
KafkaStoragePluginConfig kafkaStoragePluginConfig,
@@ -112,34 +114,18 @@ public class KafkaGroupScan extends AbstractGroupScan {
this.kafkaStoragePlugin = that.kafkaStoragePlugin;
this.columns = that.columns;
this.kafkaScanSpec = that.kafkaScanSpec;
- this.partitionWorkList = that.partitionWorkList;
this.assignments = that.assignments;
+ this.partitionWorkMap = that.partitionWorkMap;
}
- private static class PartitionScanWork implements CompleteWork {
-
- private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
-
- private final TopicPartition topicPartition;
- private final long beginOffset;
- private final long latestOffset;
-
- public PartitionScanWork(TopicPartition topicPartition, long beginOffset,
long latestOffset) {
- this.topicPartition = topicPartition;
- this.beginOffset = beginOffset;
- this.latestOffset = latestOffset;
- }
-
- public TopicPartition getTopicPartition() {
- return topicPartition;
- }
+ public static class PartitionScanWork implements CompleteWork {
- public long getBeginOffset() {
- return beginOffset;
- }
+ private final EndpointByteMapImpl byteMap;
+ private final KafkaPartitionScanSpec partitionScanSpec;
- public long getLatestOffset() {
- return latestOffset;
+ public PartitionScanWork(EndpointByteMap byteMap, KafkaPartitionScanSpec
partitionScanSpec) {
+ this.byteMap = (EndpointByteMapImpl)byteMap;
+ this.partitionScanSpec = partitionScanSpec;
}
@Override
@@ -149,7 +135,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
@Override
public long getTotalBytes() {
- return (latestOffset - beginOffset) * MSG_SIZE;
+ return (partitionScanSpec.getEndOffset() -
partitionScanSpec.getStartOffset()) * MSG_SIZE;
}
@Override
@@ -157,6 +143,9 @@ public class KafkaGroupScan extends AbstractGroupScan {
return byteMap;
}
+ public KafkaPartitionScanSpec getPartitionScanSpec() {
+ return partitionScanSpec;
+ }
}
/**
@@ -164,7 +153,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
* corresponding topicPartition
*/
private void init() {
- partitionWorkList = Lists.newArrayList();
+ partitionWorkMap = Maps.newHashMap();
Collection<DrillbitEndpoint> endpoints =
kafkaStoragePlugin.getContext().getBits();
Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
for (DrillbitEndpoint endpoint : endpoints) {
@@ -211,12 +200,13 @@ public class KafkaGroupScan extends AbstractGroupScan {
// computes work for each end point
for (PartitionInfo partitionInfo : topicPartitions) {
- TopicPartition topicPartition = new TopicPartition(topicName,
partitionInfo.partition());
+ TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
long lastCommittedOffset = startOffsetsMap.get(topicPartition);
long latestOffset = endOffsetsMap.get(topicPartition);
logger.debug("Latest offset of {} is {}", topicPartition, latestOffset);
logger.debug("Last committed offset of {} is {}", topicPartition,
lastCommittedOffset);
- PartitionScanWork work = new PartitionScanWork(topicPartition,
lastCommittedOffset, latestOffset);
+ KafkaPartitionScanSpec partitionScanSpec = new
KafkaPartitionScanSpec(topicPartition.topic(), topicPartition.partition(),
lastCommittedOffset, latestOffset);
+ PartitionScanWork work = new PartitionScanWork(new
EndpointByteMapImpl(), partitionScanSpec);
Node[] inSyncReplicas = partitionInfo.inSyncReplicas();
for (Node isr : inSyncReplicas) {
String host = isr.host();
@@ -225,23 +215,22 @@ public class KafkaGroupScan extends AbstractGroupScan {
work.getByteMap().add(ep, work.getTotalBytes());
}
}
- partitionWorkList.add(work);
+ partitionWorkMap.put(topicPartition, work);
}
}
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- assignments = AssignmentCreator.getMappings(incomingEndpoints,
partitionWorkList);
+ assignments = AssignmentCreator.getMappings(incomingEndpoints,
Lists.newArrayList(partitionWorkMap.values()));
}
@Override
public KafkaSubScan getSpecificScan(int minorFragmentId) {
List<PartitionScanWork> workList = assignments.get(minorFragmentId);
- List<KafkaSubScanSpec> scanSpecList = Lists.newArrayList();
+ List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
for (PartitionScanWork work : workList) {
- scanSpecList.add(new KafkaSubScanSpec(work.getTopicPartition().topic(),
work.getTopicPartition().partition(),
- work.getBeginOffset(), work.getLatestOffset()));
+ scanSpecList.add(work.partitionScanSpec);
}
return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns,
scanSpecList);
@@ -249,14 +238,14 @@ public class KafkaGroupScan extends AbstractGroupScan {
@Override
public int getMaxParallelizationWidth() {
- return partitionWorkList.size();
+ return partitionWorkMap.values().size();
}
@Override
public ScanStats getScanStats() {
long messageCount = 0;
- for (PartitionScanWork work : partitionWorkList) {
- messageCount += (work.getLatestOffset() - work.getBeginOffset());
+ for (PartitionScanWork work : partitionWorkMap.values()) {
+ messageCount += (work.getPartitionScanSpec().getEndOffset() -
work.getPartitionScanSpec().getStartOffset());
}
return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, messageCount, 1,
messageCount * MSG_SIZE);
}
@@ -275,7 +264,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (affinities == null) {
- affinities = AffinityCreator.getAffinityMap(partitionWorkList);
+ affinities =
AffinityCreator.getAffinityMap(Lists.newArrayList(partitionWorkMap.values()));
}
return affinities;
}
@@ -293,6 +282,23 @@ public class KafkaGroupScan extends AbstractGroupScan {
return clone;
}
+ public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec>
partitionScanSpecList) {
+ KafkaGroupScan clone = new KafkaGroupScan(this);
+ HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet();
+
+ for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
+ TopicPartition tp = new TopicPartition(scanSpec.getTopicName(),
scanSpec.getPartitionId());
+ partitionsInSpec.add(tp);
+
+ PartitionScanWork newScanWork = new
PartitionScanWork(partitionWorkMap.get(tp).getByteMap(), scanSpec);
+ clone.partitionWorkMap.put(tp, newScanWork);
+ }
+
+ //Remove unnecessary partitions from partitionWorkMap
+ clone.partitionWorkMap.keySet().removeIf(tp ->
!partitionsInSpec.contains(tp));
+ return clone;
+ }
+
@JsonProperty
public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
return kafkaStoragePlugin.getConfig();
@@ -318,4 +324,12 @@ public class KafkaGroupScan extends AbstractGroupScan {
return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]",
kafkaScanSpec, columns);
}
+ @JsonIgnore
+ public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
+ List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList();
+ for (PartitionScanWork work : partitionWorkMap.values()) {
+ partitionScanSpecList.add(work.partitionScanSpec.clone());
+ }
+ return partitionScanSpecList;
+ }
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
new file mode 100644
index 0000000..ba39b76
--- /dev/null
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class KafkaNodeProcessor extends AbstractExprVisitor<Boolean,
LogicalExpression, RuntimeException> {
+
+ private String functionName;
+ private Boolean success;
+ private Long value;
+ private String path;
+
+ public KafkaNodeProcessor(String functionName) {
+ this.functionName = functionName;
+ this.success = false;
+ }
+
+ public static boolean isPushdownFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg)
throws RuntimeException {
+ return false;
+ }
+
+ public static KafkaNodeProcessor process(FunctionCall call) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) :
null;
+ KafkaNodeProcessor evaluator = new KafkaNodeProcessor(functionName);
+
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName =
COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ return evaluator;
+ }
+
+ public boolean isSuccess() {
+ // TODO Auto-generated method stub
+ return success;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public Long getValue() {
+ return value;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg)
throws RuntimeException {
+ this.path = path.getRootSegmentPath();
+
+ if(valueArg == null) {
+ return false;
+ }
+
+ switch (this.path) {
+ case "kafkaMsgOffset":
+ /*
+ * Do not pushdown "not_equal" on kafkaMsgOffset.
+ */
+ if(functionName.equals("not_equal")) {
+ return false;
+ }
+ case "kafkaPartitionId":
+ if(valueArg instanceof IntExpression) {
+ value = (long) ((IntExpression) valueArg).getInt();
+ return true;
+ }
+
+ if(valueArg instanceof LongExpression) {
+ value = ((LongExpression) valueArg).getLong();
+ return true;
+ }
+ break;
+ case "kafkaMsgTimestamp":
+ /*
+ Only pushdown "equal", "greater_than", "greater_than_or_equal" on
kafkaMsgTimestamp
+ */
+ if(!functionName.equals("equal") &&
!functionName.equals("greater_than")
+ && !functionName.equals("greater_than_or_equal_to")) {
+ return false;
+ }
+
+ if(valueArg instanceof LongExpression) {
+ value = ((LongExpression) valueArg).getLong();
+ return true;
+ }
+
+ if (valueArg instanceof DateExpression) {
+ value = ((DateExpression)valueArg).getDate();
+ return true;
+ }
+
+ if (valueArg instanceof TimeExpression) {
+ value = (long) ((TimeExpression)valueArg).getTime();
+ return true;
+ }
+
+ if (valueArg instanceof TimeStampExpression) {
+ value = ((TimeStampExpression) valueArg).getTimeStamp();
+ return true;
+ }
+
+ if(valueArg instanceof IntExpression) {
+ value = (long) ((IntExpression) valueArg).getInt();
+ return true;
+ }
+ break;
+ }
+ return false;
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>>
VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder =
ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(BooleanExpression.class)
+ .add(DateExpression.class)
+ .add(DoubleExpression.class)
+ .add(FloatExpression.class)
+ .add(IntExpression.class)
+ .add(LongExpression.class)
+ .add(QuotedString.class)
+ .add(TimeExpression.class)
+ .build();
+ }
+
+ private static final ImmutableMap<String, String>
COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to",
"less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to",
"greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+}
+
+
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
new file mode 100644
index 0000000..713f62e
--- /dev/null
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KafkaPartitionScanSpec {
+ private String topicName;
+ private int partitionId;
+ private long startOffset;
+ private long endOffset;
+
+ @JsonCreator
+ public KafkaPartitionScanSpec(@JsonProperty("topicName") String topicName,
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("startOffset") long startOffset,
+ @JsonProperty("endOffset") long endOffset) {
+ this.topicName = topicName;
+ this.partitionId = partitionId;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void mergeScanSpec(String functionName, KafkaPartitionScanSpec
scanSpec) {
+ switch (functionName) {
+ case "booleanAnd":
+ //Reduce the scan range
+ if(startOffset < scanSpec.startOffset) {
+ startOffset = scanSpec.startOffset;
+ }
+
+ if(endOffset > scanSpec.endOffset) {
+ endOffset = scanSpec.endOffset;
+ }
+ break;
+ case "booleanOr":
+ //Increase the scan range
+ if(scanSpec.startOffset < startOffset) {
+ startOffset = scanSpec.startOffset;
+ }
+
+ if(scanSpec.endOffset > endOffset) {
+ endOffset = scanSpec.endOffset;
+ }
+ break;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof KafkaPartitionScanSpec) {
+ KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj);
+ return this.topicName.equals(that.topicName) && this.partitionId ==
that.partitionId
+ && this.startOffset == that.startOffset && this.endOffset ==
that.endOffset;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaPartitionScanSpec [topicName=" + topicName + ", partitionId="
+ partitionId + ", startOffset="
+ + startOffset + ", endOffset=" + endOffset + "]";
+ }
+
+ public KafkaPartitionScanSpec clone() {
+ return new KafkaPartitionScanSpec(topicName, partitionId, startOffset,
endOffset);
+ }
+}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
new file mode 100644
index 0000000..b52ed44
--- /dev/null
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaPartitionScanSpecBuilder extends
+ AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class);
+ private final LogicalExpression le;
+ private final KafkaGroupScan groupScan;
+ private final KafkaConsumer<? ,?> kafkaConsumer;
+ private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec;
+ private static final long CLOSE_TIMEOUT_MS = 200;
+
+ public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan,
LogicalExpression conditionExp) {
+ this.groupScan = groupScan;
+ kafkaConsumer = new
KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ le = conditionExp;
+ }
+
+ public List<KafkaPartitionScanSpec> parseTree() {
+ ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder =
ImmutableMap.builder();
+ for(KafkaPartitionScanSpec scanSpec :
groupScan.getPartitionScanSpecList()) {
+ builder.put(new TopicPartition(scanSpec.getTopicName(),
scanSpec.getPartitionId()), scanSpec);
+ }
+ fullScanSpec = builder.build();
+ List<KafkaPartitionScanSpec> pushdownSpec = le.accept(this, null);
+
+ /*
+ Non-existing / invalid partitions may result in empty scan spec.
+ This results in a "ScanBatch" with no reader. DRILL currently requires
+ at least one reader to be present in a scan batch.
+ */
+ if(pushdownSpec != null && pushdownSpec.isEmpty()) {
+ TopicPartition firstPartition = new
TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0);
+ KafkaPartitionScanSpec emptySpec =
+ new
KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(),
+ fullScanSpec.get(firstPartition).getEndOffset(),
fullScanSpec.get(firstPartition).getEndOffset());
+ pushdownSpec.add(emptySpec);
+ }
+ return pushdownSpec;
+ }
+
+ @Override
+ public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void
value)
+ throws RuntimeException {
+ return null;
+ }
+
+ @Override
+ public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op,
Void value)
+ throws RuntimeException {
+
+ Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap();
+ ImmutableList<LogicalExpression> args = op.args;
+ if(op.getName().equals("booleanOr")) {
+
+ for(LogicalExpression expr : args) {
+ List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+ //parsedSpec is null if expression cannot be pushed down
+ if(parsedSpec != null) {
+ for(KafkaPartitionScanSpec newSpec : parsedSpec) {
+ TopicPartition tp = new TopicPartition(newSpec.getTopicName(),
newSpec.getPartitionId());
+ KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+ //If existing spec does not contain topic-partition
+ if(existingSpec == null) {
+ specMap.put(tp, newSpec); //Add topic-partition to spec for OR
+ } else {
+ existingSpec.mergeScanSpec(op.getName(), newSpec);
+ specMap.put(tp, existingSpec);
+ }
+ }
+ } else {
+ return null; //At any level, all arguments of booleanOr should
support pushdown, else return null
+ }
+ }
+ } else { //booleanAnd
+ specMap.putAll(fullScanSpec);
+ for(LogicalExpression expr : args) {
+ List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+
+ //parsedSpec is null if expression cannot be pushed down
+ if(parsedSpec != null) {
+ Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store
topic-partitions returned from new spec.
+
+ for (KafkaPartitionScanSpec newSpec : parsedSpec) {
+ TopicPartition tp = new TopicPartition(newSpec.getTopicName(),
newSpec.getPartitionId());
+ partitionsInNewSpec.add(tp);
+ KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+ if (existingSpec != null) {
+ existingSpec.mergeScanSpec(op.getName(), newSpec);
+ specMap.put(tp, existingSpec);
+ }
+ }
+
+ /*
+ For "booleanAnd", handle the case where condition is on
`kafkaPartitionId`.
+ In this case, we would not want unnecessarily scan all the
topic-partitions.
+ Hence we remove the unnecessary topic-partitions from the spec.
+ */
+ specMap.keySet().removeIf(partition ->
!partitionsInNewSpec.contains(partition));
+ }
+
+ }
+ }
+ return Lists.newArrayList(specMap.values());
+ }
+
+ @Override
+ public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call,
Void value)
+ throws RuntimeException {
+
+ String functionName = call.getName();
+ if(KafkaNodeProcessor.isPushdownFunction(functionName)) {
+
+ KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call);
+ if(kafkaNodeProcessor.isSuccess()) {
+ switch (kafkaNodeProcessor.getPath()) {
+ case "kafkaMsgTimestamp":
+ return
createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(),
+ kafkaNodeProcessor.getValue());
+ case "kafkaMsgOffset":
+ return
createScanSpecForOffset(kafkaNodeProcessor.getFunctionName(),
+ kafkaNodeProcessor.getValue());
+ case "kafkaPartitionId":
+ return
createScanSpecForPartition(kafkaNodeProcessor.getFunctionName(),
+ kafkaNodeProcessor.getValue());
+ }
+ }
+ }
+ return null; //Return null, do not pushdown
+ }
+
+
+ private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String
functionName,
+ Long
fieldValue) {
+ List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+ Map<TopicPartition, Long> timesValMap = Maps.newHashMap();
+ ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+ for(TopicPartition partitions : topicPartitions) {
+ timesValMap.put(partitions, functionName.equals("greater_than") ?
fieldValue+1 : fieldValue);
+ }
+
+ Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp =
kafkaConsumer.offsetsForTimes(timesValMap);
+
+ for(TopicPartition tp : topicPartitions) {
+ OffsetAndTimestamp value = offsetAndTimestamp.get(tp);
+ //OffsetAndTimestamp is null if there is no offset greater or equal to
requested timestamp
+ if(value == null) {
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getEndOffset(),
fullScanSpec.get(tp).getEndOffset()));
+ } else {
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ value.offset(), fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+
+ return scanSpec;
+ }
+
+ private List<KafkaPartitionScanSpec> createScanSpecForOffset(String
functionName,
+ Long
fieldValue) {
+ List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+ ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+ /*
+ We should handle the case where the specified offset does not exist in the
current context,
+ i.e., fieldValue < startOffset or fieldValue > endOffset in a particular
topic-partition.
+ Else, KafkaConsumer.poll will throw "TimeoutException".
+ */
+
+ switch (functionName) {
+ case "equal":
+ for(TopicPartition tp : topicPartitions) {
+ if(fieldValue < fullScanSpec.get(tp).getStartOffset()) {
+ //Offset does not exist
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getEndOffset(),
fullScanSpec.get(tp).getEndOffset()));
+ } else {
+ long val = Math.min(fieldValue,
fullScanSpec.get(tp).getEndOffset());
+ long nextVal = Math.min(val+1,
fullScanSpec.get(tp).getEndOffset());
+ scanSpec.add(new KafkaPartitionScanSpec(tp.topic(),
tp.partition(), val, nextVal));
+ }
+ }
+ break;
+ case "greater_than_or_equal_to":
+ for(TopicPartition tp : topicPartitions) {
+ //Ensure scan range is between startOffset and endOffset,
+ long val = bindOffsetToRange(tp, fieldValue);
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(), val,
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ break;
+ case "greater_than":
+ for(TopicPartition tp : topicPartitions) {
+ //Ensure scan range is between startOffset and endOffset,
+ long val = bindOffsetToRange(tp, fieldValue+1);
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ val, fullScanSpec.get(tp).getEndOffset()));
+ }
+ break;
+ case "less_than_or_equal_to":
+ for(TopicPartition tp : topicPartitions) {
+ //Ensure scan range is between startOffset and endOffset,
+ long val = bindOffsetToRange(tp, fieldValue+1);
+
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(), val));
+ }
+ break;
+ case "less_than":
+ for(TopicPartition tp : topicPartitions) {
+ //Ensure scan range is between startOffset and endOffset,
+ long val = bindOffsetToRange(tp, fieldValue);
+
+ scanSpec.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(), val));
+ }
+ break;
+ }
+ return scanSpec;
+ }
+
+ private List<KafkaPartitionScanSpec> createScanSpecForPartition(String
functionName,
+ Long
fieldValue) {
+ List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
+ ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+ switch (functionName) {
+ case "equal":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() == fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ case "not_equal":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() != fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ case "greater_than_or_equal_to":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() >= fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ case "greater_than":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() > fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ case "less_than_or_equal_to":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() <= fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ case "less_than":
+ for(TopicPartition tp : topicPartitions) {
+ if(tp.partition() < fieldValue) {
+ scanSpecList.add(
+ new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+ fullScanSpec.get(tp).getStartOffset(),
+ fullScanSpec.get(tp).getEndOffset()));
+ }
+ }
+ break;
+ }
+ return scanSpecList;
+ }
+
+ void close() {
+ kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+
+ private long bindOffsetToRange(TopicPartition tp, long offset) {
+ return Math.max(fullScanSpec.get(tp).getStartOffset(), Math.min(offset,
fullScanSpec.get(tp).getEndOffset()));
+ }
+}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
new file mode 100644
index 0000000..bf11f85
--- /dev/null
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import java.util.List;
+
+public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
+
+ public static final StoragePluginOptimizerRule INSTANCE =
+ new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class,
RelOptHelper.any(ScanPrel.class)),
+ "KafkaPushFilterIntoScan:Filter_On_Scan");
+
+ private KafkaPushDownFilterIntoScan(RelOptRuleOperand operand, String
description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(1);
+ final FilterPrel filter = call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ LogicalExpression conditionExp =
+ DrillOptiq.toDrill(new
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan,
condition);
+
+ KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan();
+ logger.info("Partitions ScanSpec before pushdown: " +
groupScan.getPartitionScanSpecList());
+ KafkaPartitionScanSpecBuilder builder = new
KafkaPartitionScanSpecBuilder(groupScan, conditionExp);
+ List<KafkaPartitionScanSpec> newScanSpec = null;
+ newScanSpec = builder.parseTree();
+ builder.close(); //Close consumer
+
+ //No pushdown
+ if(newScanSpec == null) {
+ return;
+ }
+
+ logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
+ GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
newGroupScan, scan.getRowType());
+ call.transformTo(filter.copy(filter.getTraitSet(),
ImmutableList.of(newScanPrel)));
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof KafkaGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index b1cf9cd..a0fc1f1 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -52,7 +51,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
private final boolean unionEnabled;
private final KafkaStoragePlugin plugin;
- private final KafkaSubScanSpec subScanSpec;
+ private final KafkaPartitionScanSpec subScanSpec;
private final long kafkaPollTimeOut;
private long currentOffset;
@@ -62,7 +61,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
private final boolean readNumbersAsDouble;
private final String kafkaMsgReader;
- public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
+ public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
FragmentContext context, KafkaStoragePlugin plugin) {
setColumns(projectedColumns);
final OptionManager optionManager = context.getOptions();
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 9bedbd5..ae78d8c 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -44,7 +44,7 @@ public class KafkaScanBatchCreator implements
BatchCreator<KafkaSubScan> {
List<SchemaPath> columns = subScan.getColumns() != null ?
subScan.getColumns() : GroupScan.ALL_COLUMNS;
List<RecordReader> readers = new LinkedList<>();
- for (KafkaSubScan.KafkaSubScanSpec scanSpec :
subScan.getPartitionSubScanSpecList()) {
+ for (KafkaPartitionScanSpec scanSpec :
subScan.getPartitionSubScanSpecList()) {
readers.add(new KafkaRecordReader(scanSpec, columns, context,
subScan.getKafkaStoragePlugin()));
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 157e367..4ca91ec 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -71,7 +71,7 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin
{
@Override
public Set<StoragePluginOptimizerRule>
getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of();
+ return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
}
@Override
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 468f766..d62faa6 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -43,14 +43,14 @@ public class KafkaSubScan extends AbstractBase implements
SubScan {
private final KafkaStoragePlugin kafkaStoragePlugin;
private final List<SchemaPath> columns;
- private final List<KafkaSubScanSpec> partitionSubScanSpecList;
+ private final List<KafkaPartitionScanSpec> partitionSubScanSpecList;
@JsonCreator
public KafkaSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("kafkaStoragePluginConfig")
KafkaStoragePluginConfig kafkaStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("partitionSubScanSpecList")
LinkedList<KafkaSubScanSpec> partitionSubScanSpecList)
+ @JsonProperty("partitionSubScanSpecList")
LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
throws ExecutionSetupException {
this(userName,
(KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
@@ -61,7 +61,7 @@ public class KafkaSubScan extends AbstractBase implements
SubScan {
public KafkaSubScan(String userName,
KafkaStoragePlugin kafkaStoragePlugin,
List<SchemaPath> columns,
- List<KafkaSubScanSpec> partitionSubScanSpecList) {
+ List<KafkaPartitionScanSpec> partitionSubScanSpecList) {
super(userName);
this.kafkaStoragePlugin = kafkaStoragePlugin;
this.columns = columns;
@@ -95,7 +95,7 @@ public class KafkaSubScan extends AbstractBase implements
SubScan {
}
@JsonProperty
- public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
+ public List<KafkaPartitionScanSpec> getPartitionSubScanSpecList() {
return partitionSubScanSpecList;
}
@@ -108,68 +108,4 @@ public class KafkaSubScan extends AbstractBase implements
SubScan {
public int getOperatorType() {
return CoreOperatorType.KAFKA_SUB_SCAN_VALUE;
}
-
- public static class KafkaSubScanSpec {
- protected String topicName;
- protected int partitionId;
- protected long startOffset;
- protected long endOffset;
-
- @JsonCreator
- public KafkaSubScanSpec(@JsonProperty("topicName") String topicName,
@JsonProperty("partitionId") int partitionId,
- @JsonProperty("startOffset") long startOffset,
@JsonProperty("endOffset") long endOffset) {
- this.topicName = topicName;
- this.partitionId = partitionId;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- KafkaSubScanSpec() {
-
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public long getStartOffset() {
- return startOffset;
- }
-
- public long getEndOffset() {
- return endOffset;
- }
-
- public KafkaSubScanSpec setTopicName(String topicName) {
- this.topicName = topicName;
- return this;
- }
-
- public KafkaSubScanSpec setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- return this;
- }
-
- public KafkaSubScanSpec setStartOffset(long startOffset) {
- this.startOffset = startOffset;
- return this;
- }
-
- public KafkaSubScanSpec setEndOffset(long endOffset) {
- this.endOffset = endOffset;
- return this;
- }
-
- @Override
- public String toString() {
- return "KafkaSubScanSpec [topicName=" + topicName + ", partitionId=" +
partitionId + ", startOffset="
- + startOffset + ", endOffset=" + endOffset + "]";
- }
-
- }
-
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 3afb1b8..1c814f6 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,7 +45,7 @@ public class MessageIterator implements
Iterator<ConsumerRecord<byte[], byte[]>>
private final long kafkaPollTimeOut;
private final long endOffset;
- public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer,
final KafkaSubScanSpec subScanSpec,
+ public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer,
final KafkaPartitionScanSpec subScanSpec,
final long kafkaPollTimeOut) {
this.kafkaConsumer = kafkaConsumer;
this.kafkaPollTimeOut = kafkaPollTimeOut;
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
new file mode 100644
index 0000000..7be0ec3
--- /dev/null
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.categories.KafkaStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
+import static
org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+
+@Category({KafkaStorageTest.class, SlowTest.class})
+public class KafkaFilterPushdownTest extends KafkaTestBase {
+ private static final int NUM_PARTITIONS = 5;
+ private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" +
+ " \"topicName\" :
\"drill-pushdown-topic\"\n" +
+ " },\n" +
+ " \"cost\" : %s.0";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestKafkaSuit.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC,
NUM_PARTITIONS);
+ KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
+ StringSerializer.class);
+
generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC,
NUM_JSON_MSG);
+ }
+
+ /**
+ * Test filter pushdown with condition on kafkaMsgOffset.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownOnOffset() throws Exception {
+ final String predicate1 = "kafkaMsgOffset > 4";
+ final String predicate2 = "kafkaMsgOffset < 6";
+ final int expectedRowCount = 5; //1 * NUM_PARTITIONS
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+ runKafkaSQLVerifyCount(queryString, expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown with condition on kafkaPartitionId.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownOnPartition() throws Exception {
+ final String predicate = "kafkaPartitionId = 1";
+ final int expectedRowCount = NUM_JSON_MSG;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+ runKafkaSQLVerifyCount(queryString, expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown with condition on kafkaPartitionId.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownOnTimestamp() throws Exception {
+ final String predicate = "kafkaMsgTimestamp > 6";
+ final int expectedRowCount = 20;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown when timestamp is not ordered.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownUnorderedTimestamp() throws Exception {
+ final String predicate = "kafkaMsgTimestamp = 1";
+ final int expectedRowInPlan = 50;
+ final int expectedRowCount = 5;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowInPlan));
+ }
+
+ /**
+ * Test filter pushdown when timestamp value specified does not exist.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWhenTimestampDoesNotExist() throws Exception {
+ final String predicate = "kafkaMsgTimestamp = 20"; //20 does not exist
+ final int expectedRowCount = 0;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown when partition value specified does not exist.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWhenPartitionDoesNotExist() throws Exception {
+ final String predicate = "kafkaPartitionId = 100"; //100 does not exist
+ final int expectedRowCount = 0;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown when timestamp exist but partition does not exist.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownForEmptyScanSpec() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp > 6";
+ final String predicate2 = "kafkaPartitionId = 100";
+ final int expectedRowCount = 0;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+ * In every case, the number of records returned is 0.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions()
throws Exception {
+ final int expectedRowCount = 0;
+
+ //"equal" such that value = endOffset
+ String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"equal" such that value < startOffset
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"greater_than" such that value = endOffset-1
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"greater_than_or_equal" such that value = endOffset
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"less_than" such that value = startOffset
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"less_than_or_equal" such that value < startOffset
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+ * In every case, the number of records returned is 5 (1 per
topic-partition).
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions()
throws Exception {
+ final int expectedRowCount = 5;
+
+ //"equal" such that value = endOffset-1
+ String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
+
+ runKafkaSQLVerifyCount(queryString, expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"greater_than" such that value = endOffset-2
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+
+ //"greater_than_or_equal" such that value = endOffset-1
+ queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown with OR.
+ * Pushdown is possible if all the predicates are on metadata fields.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWithOr() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp > 6";
+ final String predicate2 = "kafkaPartitionId = 1";
+ final int expectedRowCount = 26;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWithOr1() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp = 6";
+ final String predicate2 = "kafkaMsgOffset = 6";
+ final int expectedRowInPlan = 25; //startOff=5, endOff=9
+ final int expectedRowCount = 10;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowInPlan));
+ }
+
+ /**
+ * Test pushdown for a combination of AND and OR.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWithAndOrCombo() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp > 6";
+ final String predicate2 = "kafkaPartitionId = 1";
+ final String predicate3 = "kafkaPartitionId = 2";
+ final int expectedRowCount = 8;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCount));
+ }
+
+ /**
+ * Test pushdown for a combination of AND and OR.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownWithAndOrCombo2() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp = 6";
+ final String predicate2 = "kafkaMsgOffset = 6";
+ final String predicate3 = "kafkaPartitionId = 1";
+ final String predicate4 = "kafkaPartitionId = 2";
+ final int expectedRowCountInPlan = 10; //startOff=5, endOff=9 for 2
partitions
+ final int expectedRowCount = 4;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3, predicate4);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCountInPlan));
+ }
+
+ /**
+ * Test pushdown for predicate1 AND predicate2.
+ * Where predicate1 is on metadata field and and predicate2 is on user
fields.
+ * @throws Exception
+ */
+ @Test
+ public void testPushdownTimestampWithNonMetaField() throws Exception {
+ final String predicate1 = "kafkaMsgTimestamp > 6";
+ final String predicate2 = "boolKey = true";
+ final int expectedRowCountInPlan = 20; //startOff=5, endOff=9
+ final int expectedRowCount = 10;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCountInPlan));
+ }
+
+ /**
+ * Tests that pushdown does not happen for predicates such as
+ * non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND
kafkaMsgTimestamp < val4)
+ * @throws Exception
+ */
+ @Test
+ public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
+ final String predicate1 = "boolKey = true";
+ final String predicate2 = "kafkaMsgTimestamp > 6";
+ final String predicate3 = "kafkaMsgTimestamp < 9";
+ final int expectedRowCountInPlan = 50; //no pushdown
+ final int expectedRowCount = 30;
+
+ final String queryString =
String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2,
+ TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3);
+
+ runKafkaSQLVerifyCount(queryString,expectedRowCount);
+ testPhysicalPlan(queryString, String.format(expectedSubStr,
expectedRowCountInPlan));
+ }
+
+}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 1931898..32d9ff1 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,4 +132,37 @@ public class KafkaMessageGenerator {
}
}
+ public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
+ KafkaProducer<String, String> producer = null;
+ Random rand = new Random();
+ try {
+ producer = new KafkaProducer<String, String>(producerProperties);
+ int halfCount = numMsg / 2;
+
+ for(PartitionInfo tpInfo : producer.partitionsFor(topic)) {
+ for (int i = 1; i <= numMsg; ++i) {
+ JsonObject object = new JsonObject();
+ object.addProperty("stringKey", UUID.randomUUID().toString());
+ object.addProperty("intKey", numMsg - i);
+ object.addProperty("boolKey", i % 2 == 0);
+
+ long timestamp = i < halfCount ? (halfCount - i) : i;
+ ProducerRecord<String, String> message =
+ new ProducerRecord<String, String>(tpInfo.topic(),
tpInfo.partition(), timestamp, "key"+i, object.toString());
+ logger.info("Publishing message : {}", message);
+ Future<RecordMetadata> future = producer.send(message);
+ logger.info("Committed offset of the message : {}",
future.get().offset());
+ }
+
+ }
+ } catch (Throwable th) {
+ logger.error(th.getMessage(), th);
+ throw new DrillRuntimeException(th.getMessage(), th);
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 4a15596..4347167 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -25,7 +25,6 @@ import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -40,7 +39,7 @@ import org.junit.experimental.categories.Category;
public class MessageIteratorTest extends KafkaTestBase {
private KafkaConsumer<byte[], byte[]> kafkaConsumer;
- private KafkaSubScanSpec subScanSpec;
+ private KafkaPartitionScanSpec subScanSpec;
@Before
public void setUp() {
@@ -49,7 +48,7 @@ public class MessageIteratorTest extends KafkaTestBase {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
kafkaConsumer = new KafkaConsumer<>(consumerProps);
- subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0,
TestKafkaSuit.NUM_JSON_MSG);
+ subScanSpec = new KafkaPartitionScanSpec(TestQueryConstants.JSON_TOPIC, 0,
0, TestKafkaSuit.NUM_JSON_MSG);
}
@After
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index ed01747..ecf998e 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -27,6 +27,7 @@ import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.security.JaasUtils;
@@ -46,7 +47,8 @@ import kafka.utils.ZkUtils;
@Category({KafkaStorageTest.class, SlowTest.class})
@RunWith(Suite.class)
-@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class,
MessageReaderFactoryTest.class })
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class,
MessageReaderFactoryTest.class,
+ KafkaFilterPushdownTest.class })
public class TestKafkaSuit {
private static final Logger logger =
LoggerFactory.getLogger(LoggerFactory.class);
private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
@@ -106,4 +108,18 @@ public class TestKafkaSuit {
}
}
+ public static void createTopicHelper(final String topicName, final int
partitions) {
+
+ Properties topicProps = new Properties();
+ topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+ ZkUtils zkUtils = new ZkUtils(zkClient,
+ new
ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
+ AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
+ topicProps, RackAwareMode.Disabled$.MODULE$);
+
+ org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
fetchTopicMetadataFromZk =
+ AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
+ logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+ }
+
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
index 057af7e..b3163ad 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -29,12 +29,24 @@ public interface TestQueryConstants {
int MAX_CLIENT_CONNECTIONS = 100;
String JSON_TOPIC = "drill-json-topic";
+ String JSON_PUSHDOWN_TOPIC = "drill-pushdown-topic";
String AVRO_TOPIC = "drill-avro-topic";
String INVALID_TOPIC = "invalid-topic";
+ String KAFKA_MSG_TIMESTAMP_FIELD = "kafkaMsgTimestamp";
+ String KAFKA_PARTITION_ID_FIELD = "kafkaPartitionId";
+ String KAFKA_MSG_OFFSET_FIELD = "kafkaMsgOffset";
+
// Queries
String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
String MSG_SELECT_QUERY = "select * from kafka.`%s`";
String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from
kafka.`%s`";
String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from
kafka.`%s`";
+
+ String QUERY_TEMPLATE_BASIC = "select * from kafka.`%s` where %s";
+ String QUERY_TEMPLATE_AND = "select * from kafka.`%s` where %s AND %s";
+ String QUERY_TEMPLATE_OR = "select * from kafka.`%s` where %s OR %s";
+ String QUERY_TEMPLATE_AND_OR_PATTERN_1 = "select * from kafka.`%s` where %s
AND (%s OR %s)";
+ String QUERY_TEMPLATE_AND_OR_PATTERN_2 = "select * from kafka.`%s` where %s
OR (%s AND %s)";
+ String QUERY_TEMPLATE_AND_OR_PATTERN_3 = "select * from kafka.`%s` where (%s
OR %s) AND (%s OR %s)";
}