This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 75e4877 DRILL-92: Cassandra storage plugin implementation based on
Calcite adapter (#2152)
75e4877 is described below
commit 75e4877b46380188b6545b99f188b764b2a576aa
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri Jan 29 10:04:32 2021 +0200
DRILL-92: Cassandra storage plugin implementation based on Calcite adapter
(#2152)
---
contrib/pom.xml | 1 +
contrib/storage-cassandra/README.md | 47 +++
contrib/storage-cassandra/pom.xml | 107 +++++++
.../java/com/codahale/metrics/JmxReporter.java | 66 ++++
.../calcite/adapter/cassandra/CalciteUtils.java | 65 ++++
.../store/cassandra/CassandraStorageConfig.java | 96 ++++++
.../store/cassandra/CassandraStoragePlugin.java | 76 +++++
.../plan/CassandraEnumerablePrelContext.java | 81 +++++
.../cassandra/plan/DrillCassandraLimitRule.java | 43 +++
.../cassandra/schema/CassandraDrillSchema.java | 80 +++++
.../cassandra/schema/CassandraDynamicTable.java | 95 ++++++
.../cassandra/schema/CassandraRootDrillSchema.java | 83 +++++
.../schema/CassandraRootDrillSchemaFactory.java | 45 +++
.../main/resources/bootstrap-storage-plugins.json | 11 +
.../src/main/resources/drill-module.conf | 24 ++
.../exec/store/cassandra/BaseCassandraTest.java | 84 ++++++
.../store/cassandra/CassandraComplexTypesTest.java | 69 +++++
.../exec/store/cassandra/CassandraPlanTest.java | 63 ++++
.../exec/store/cassandra/CassandraQueryTest.java | 335 +++++++++++++++++++++
.../src/test/resources/queries.cql | 96 ++++++
distribution/pom.xml | 5 +
distribution/src/assemble/component.xml | 1 +
.../apache/drill/exec/record/ColumnConverter.java | 10 +-
.../enumerable/plan/EnumerablePrelContext.java | 2 +-
pom.xml | 2 +-
25 files changed, 1584 insertions(+), 3 deletions(-)
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 40c66f1..c94deff 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -61,6 +61,7 @@
<module>storage-http</module>
<module>storage-druid</module>
<module>storage-elasticsearch</module>
+ <module>storage-cassandra</module>
</modules>
</project>
diff --git a/contrib/storage-cassandra/README.md
b/contrib/storage-cassandra/README.md
new file mode 100644
index 0000000..07d58ab
--- /dev/null
+++ b/contrib/storage-cassandra/README.md
@@ -0,0 +1,47 @@
+# Drill Cassandra Plugin
+
+Drill Cassandra storage plugin allows you to perform SQL queries against
Cassandra tables.
+This storage plugin implementation is based on [Apache Calcite adapter for
Cassandra](https://calcite.apache.org/docs/cassandra_adapter.html).
+
+This storage plugin may be used for querying Scylla DB.
+
+### Supported optimizations and features
+
+This storage plugin supports the following optimizations:
+
+- Project pushdown
+- Filter pushdown (only expressions supported by Calcite adapter for Cassandra)
+- Limit pushdown
+
+Except for these optimizations, Cassandra storage plugin supports the schema
provisioning feature.
+For more details please refer to [Specifying the Schema as Table Function
Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter).
+
+### Plugin registration
+
+The plugin can be registered in Apache Drill using the drill web interface by
navigating to the `storage` page.
+Following is the default registration configuration.
+
+```json
+{
+ "type" : "cassandra",
+ "host" : "localhost",
+ "port" : 9042,
+ "username" : null,
+ "password" : null,
+ "enabled": false
+}
+```
+
+### Developer notes
+
+Most of the common classes required for creating storage plugins based on
Calcite adapters are placed in the
+`java-exec` module, so they can be reused in future plugin implementations.
+
+Here is the list of the classes that may be useful:
+
+- `VertexDrelConverterRule` with `VertexDrel` - used to hold plugin-specific
part of the plan at the end of the
+ `LOGICAL` planning phase.
+- `EnumerableIntermediatePrelConverterRule` with `EnumerableIntermediatePrel`
- the same as above, but for the
+ `PHYSICAL` planning phase.
+- `EnumerablePrel` - responsible for generating java code that will be
executed to query the storage plugin data source.
+- `EnumerableRecordReader` - executes java code generated in `EnumerablePrel`
and transforms obtained results to Drill internal representation.
diff --git a/contrib/storage-cassandra/pom.xml
b/contrib/storage-cassandra/pom.xml
new file mode 100644
index 0000000..6cc7f83
--- /dev/null
+++ b/contrib/storage-cassandra/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-storage-cassandra</artifactId>
+
+ <name>contrib/cassandra-storage-plugin</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${calcite.groupId}</groupId>
+ <artifactId>calcite-cassandra</artifactId>
+ <version>${calcite.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.scylladb</groupId>
+ <artifactId>scylla-driver-core</artifactId>
+ <version>3.10.1-scylla-0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ <version>${asm.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.nosan</groupId>
+ <artifactId>embedded-cassandra</artifactId>
+ <version>4.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.hydromatic</groupId>
+ <artifactId>foodmart-data-json</artifactId>
+ <version>0.4</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkCount combine.self="override">1</forkCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java
b/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java
new file mode 100644
index 0000000..970ded1
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.codahale.metrics;
+
+import java.io.Closeable;
+
+/**
+ * Adapter for compatibility of metrics-jms for 3 and 4 versions.
+ */
+public class JmxReporter implements Reporter, Closeable {
+
+ private final com.codahale.metrics.jmx.JmxReporter delegate;
+
+ public JmxReporter(com.codahale.metrics.jmx.JmxReporter delegate) {
+ this.delegate = delegate;
+ }
+
+ public static Builder forRegistry(MetricRegistry registry) {
+ return new Builder(registry);
+ }
+
+ public void start() {
+ delegate.start();
+ }
+
+ public void stop() {
+ delegate.stop();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ public static class Builder {
+ private final com.codahale.metrics.jmx.JmxReporter.Builder delegate;
+
+ public Builder(MetricRegistry registry) {
+ delegate = com.codahale.metrics.jmx.JmxReporter.forRegistry(registry);
+ }
+
+ public Builder inDomain(String domain) {
+ delegate.inDomain(domain);
+ return this;
+ }
+
+ public JmxReporter build() {
+ return new JmxReporter(delegate.build());
+ }
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
b/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
new file mode 100644
index 0000000..e996522
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import com.datastax.driver.core.Session;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+import
org.apache.drill.exec.store.cassandra.plan.CassandraEnumerablePrelContext;
+import org.apache.drill.exec.store.cassandra.plan.DrillCassandraLimitRule;
+import org.apache.drill.exec.store.cassandra.schema.CassandraDrillSchema;
+import
org.apache.drill.exec.store.enumerable.plan.EnumerableIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CalciteUtils {
+ private static final VertexDrelConverterRule VERTEX_DREL_CONVERTER_RULE =
+ new VertexDrelConverterRule(CassandraRel.CONVENTION);
+
+ private static final ConverterRule
ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE =
+ new EnumerableIntermediatePrelConverterRule(
+ new CassandraEnumerablePrelContext(CassandraStorageConfig.NAME));
+
+ public static CassandraTableScan tableScanCreator(RelOptCluster cluster,
RelTraitSet traitSet,
+ RelOptTable table, CassandraTable cassandraTable,
+ RelDataType projectRowType) {
+ return new CassandraTableScan(cluster, traitSet, table, cassandraTable,
projectRowType);
+ }
+
+ public static Set<RelOptRule> cassandraRules() {
+ Set<RelOptRule> rules = Arrays.stream(CassandraRules.RULES)
+ .collect(Collectors.toSet());
+ rules.add(DrillCassandraLimitRule.INSTANCE);
+ rules.add(ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE);
+ rules.add(VERTEX_DREL_CONVERTER_RULE);
+ return rules;
+ }
+
+ public static Session getSession(SchemaPlus schema) {
+ return
schema.unwrap(CassandraDrillSchema.class).getDelegatingSchema().session;
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
new file mode 100644
index 0000000..837bb58
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(CassandraStorageConfig.NAME)
+public class CassandraStorageConfig extends StoragePluginConfig {
+ public static final String NAME = "cassandra";
+
+ private final String host;
+ private final String username;
+ private final String password;
+ private final int port;
+
+ @JsonCreator
+ public CassandraStorageConfig(
+ @JsonProperty("host") String host,
+ @JsonProperty("port") int port,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password) {
+ this.host = host;
+ this.username = username;
+ this.password = password;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @JsonIgnore
+ public Map<String, Object> toConfigMap() {
+ Map<String, Object> result = new HashMap<>();
+
+ result.put("host", host);
+ result.put("port", port);
+ result.put("username", username);
+ result.put("password", password);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CassandraStorageConfig that = (CassandraStorageConfig) o;
+ return Objects.equals(host, that.host)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, username, password);
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
new file mode 100644
index 0000000..d5f1a8a
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.calcite.adapter.cassandra.CalciteUtils;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import
org.apache.drill.exec.store.cassandra.schema.CassandraRootDrillSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class CassandraStoragePlugin extends AbstractStoragePlugin {
+
+ private final CassandraStorageConfig config;
+ private final SchemaFactory schemaFactory;
+
+ public CassandraStoragePlugin(
+ CassandraStorageConfig config, DrillbitContext context, String name) {
+ super(context, name);
+ this.config = config;
+ this.schemaFactory = new CassandraRootDrillSchemaFactory(name, this);
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+
+ @Override
+ public CassandraStorageConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext
optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return CalciteUtils.cassandraRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return ImmutableSet.of();
+ }
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
new file mode 100644
index 0000000..06cf181
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.plan;
+
+import org.apache.calcite.adapter.cassandra.CassandraToEnumerableConverterRule;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class CassandraEnumerablePrelContext implements EnumerablePrelContext {
+ private final String planPrefix;
+
+ public CassandraEnumerablePrelContext(String planPrefix) {
+ this.planPrefix = planPrefix;
+ }
+
+ @Override
+ public String generateCode(RelOptCluster cluster, RelNode relNode) {
+ RelNode enumerableRel =
+ CassandraToEnumerableConverterRule.INSTANCE.convert(relNode);
+
+ ClassDeclaration classDeclaration = new
EnumerableRelImplementor(cluster.getRexBuilder(), Collections.emptyMap())
+ .implementRoot((EnumerableRel) enumerableRel,
EnumerableRel.Prefer.ARRAY);
+ return Expressions.toString(Collections.singletonList(classDeclaration),
"\n", false);
+ }
+
+ @Override
+ public RelNode transformNode(RelNode input) {
+ return input.accept(SubsetRemover.INSTANCE);
+ }
+
+ @Override
+ public Map<String, Integer> getFieldsMap(RelNode transformedNode) {
+ return transformedNode.getRowType().getFieldList().stream()
+ .collect(Collectors.toMap(
+ RelDataTypeField::getName,
+ RelDataTypeField::getIndex
+ ));
+ }
+
+ @Override
+ public String getPlanPrefix() {
+ return planPrefix;
+ }
+
+ @Override
+ public String getTablePath(RelNode input) {
+ TableScan scan = Objects.requireNonNull(DrillRelOptUtil.findScan(input));
+ List<String> qualifiedName = scan.getTable().getQualifiedName();
+ return String.join(".", qualifiedName.subList(0, qualifiedName.size() -
1));
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
new file mode 100644
index 0000000..6c4baf4
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.plan;
+
+import org.apache.calcite.adapter.cassandra.CassandraLimit;
+import org.apache.calcite.adapter.cassandra.CassandraRel;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillRel;
+
+public class DrillCassandraLimitRule extends ConverterRule {
+ public static final DrillCassandraLimitRule INSTANCE = new
DrillCassandraLimitRule();
+
+ private DrillCassandraLimitRule() {
+ super(DrillLimitRelBase.class, DrillRel.DRILL_LOGICAL,
CassandraRel.CONVENTION,
+ "DrillCassandraLimitRule");
+ }
+
+ public RelNode convert(RelNode relNode) {
+ DrillLimitRelBase limit = (DrillLimitRelBase) relNode;
+ final RelTraitSet traitSet =
+ limit.getTraitSet().replace(CassandraRel.CONVENTION);
+ return new CassandraLimit(limit.getCluster(), traitSet,
+ convert(limit.getInput(), CassandraRel.CONVENTION), limit.getOffset(),
limit.getFetch());
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
new file mode 100644
index 0000000..eb8796f
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.cassandra.CassandraSchema;
+import org.apache.calcite.adapter.cassandra.CassandraTable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CassandraDrillSchema extends AbstractSchema {
+ private final CassandraSchema delegatingSchema;
+ private final StoragePlugin plugin;
+ private final Map<String, Table> tables = new ConcurrentHashMap<>();
+
+ public CassandraDrillSchema(String name, StoragePlugin plugin,
CassandraSchema delegatingSchema) {
+ super(Collections.emptyList(), name);
+ this.plugin = plugin;
+ this.delegatingSchema = delegatingSchema;
+ }
+
+ @Override
+ public String getTypeName() {
+ return CassandraStorageConfig.NAME;
+ }
+
+ @Override
+ public Table getTable(String tableName) {
+ return tables.computeIfAbsent(tableName, this::getDrillTable);
+ }
+
+ private DrillTable getDrillTable(String tableName) {
+ CassandraTable table = (CassandraTable)
delegatingSchema.getTable(tableName);
+ return table == null ? null
+ : new CassandraDynamicTable(plugin, tableName, null, table);
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return delegatingSchema.getTableNames();
+ }
+
+ @Override
+ public Expression getExpression(SchemaPlus parentSchema, String name) {
+ return Expressions.call(
+ DataContext.ROOT,
+ BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+ }
+
+ public CassandraSchema getDelegatingSchema() {
+ return delegatingSchema;
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
new file mode 100644
index 0000000..8b1d58d
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.adapter.cassandra.CassandraRel;
+import org.apache.calcite.adapter.cassandra.CassandraTable;
+import org.apache.calcite.adapter.cassandra.CalciteUtils;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+public class CassandraDynamicTable extends DrillTable implements
TranslatableTable, QueryableTable, Wrapper {
+
+ private final CassandraTable table;
+
+ public CassandraDynamicTable(StoragePlugin plugin, String storageEngineName,
Object selection, CassandraTable table) {
+ super(storageEngineName, plugin, selection);
+ this.table = table;
+ }
+
+ @Override
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
relOptTable) {
+ RelOptCluster cluster = context.getCluster();
+ return CalciteUtils.tableScanCreator(cluster,
cluster.traitSetOf(CassandraRel.CONVENTION),
+ relOptTable, table, relOptTable.getRowType());
+ }
+
+ @Override
+ public <V> Queryable<V> asQueryable(QueryProvider queryProvider, SchemaPlus
schema, String tableName) {
+ CassandraTable cassandraTable = CassandraDynamicTable.this.table;
+ return new CassandraTable.CassandraQueryable<V>(queryProvider, schema,
cassandraTable, tableName) {
+ public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
+ List<Map.Entry<String, String>> selectFields, List<String>
predicates,
+ List<String> order, Integer offset, Integer fetch) {
+ return cassandraTable.query(CalciteUtils.getSession(schema), fields,
selectFields, predicates,
+ order, offset, fetch);
+ }
+ };
+ }
+
+ @Override
+ public Type getElementType() {
+ return table.getElementType();
+ }
+
+ public <C> C unwrap(Class<C> aClass) {
+ if (aClass.isInstance(this)) {
+ return aClass.cast(this);
+ } else if (aClass.isInstance(table)) {
+ return aClass.cast(table);
+ }
+ return null;
+ }
+
+ @Override
+ public Expression getExpression(SchemaPlus schema, String tableName, Class
clazz) {
+ return table.getExpression(schema, tableName, clazz);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return table.getRowType(typeFactory);
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
new file mode 100644
index 0000000..513f7ae
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.cassandra.CassandraSchema;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CassandraRootDrillSchema extends AbstractSchema {
+ private final Map<String, Schema> schemas = new ConcurrentHashMap<>();
+
+ private final StoragePlugin plugin;
+ private final SchemaFactory schemaFactory;
+ private final SchemaPlus parent;
+ private final String parentName;
+ private final Map<String, Object> configMap;
+
+ public CassandraRootDrillSchema(String name, StoragePlugin plugin,
SchemaFactory schemaFactory,
+ SchemaPlus parent, String parentName, Map<String, Object> configMap) {
+ super(Collections.emptyList(), name);
+ this.plugin = plugin;
+ this.schemaFactory = schemaFactory;
+ this.parent = parent;
+ this.parentName = parentName;
+ this.configMap = configMap;
+ }
+
+ @Override
+ public String getTypeName() {
+ return CassandraStorageConfig.NAME;
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return schemas.computeIfAbsent(name, this::createSubSchema);
+ }
+
+ private Schema createSubSchema(String schemaName) {
+ Map<String, Object> configs = new HashMap<>(configMap);
+ configs.put("keyspace", schemaName);
+
+ SchemaPlus parentSchema = parent.getSubSchema(parentName);
+ Schema schema = new CassandraDrillSchema(schemaName, plugin,
+ (CassandraSchema) schemaFactory.create(parentSchema, schemaName,
configs));
+ parentSchema.add(schemaName, schema);
+ return schema;
+ }
+
+ @Override
+ public Expression getExpression(SchemaPlus parentSchema, String name) {
+ return Expressions.call(
+ DataContext.ROOT,
+ BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
new file mode 100644
index 0000000..edcf72b
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.adapter.cassandra.CassandraSchemaFactory;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin;
+
+public class CassandraRootDrillSchemaFactory extends AbstractSchemaFactory {
+
+ private final CassandraStoragePlugin plugin;
+ private final SchemaFactory calciteSchemaFactory;
+
+ public CassandraRootDrillSchemaFactory(String name, CassandraStoragePlugin
plugin) {
+ super(name);
+ this.plugin = plugin;
+ this.calciteSchemaFactory = new CassandraSchemaFactory();
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ Schema schema = new CassandraRootDrillSchema(getName(), plugin,
+ calciteSchemaFactory, parent, getName(),
plugin.getConfig().toConfigMap());
+ parent.add(getName(), schema);
+ }
+}
diff --git
a/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..f433910
--- /dev/null
+++
b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+ "storage":{
+ "cassandra" : {
+ "type" : "cassandra",
+ "host" : "localhost",
+ "port" : 9042,
+ "username" : null,
+ "password" : null
+ }
+ }
+}
diff --git a/contrib/storage-cassandra/src/main/resources/drill-module.conf
b/contrib/storage-cassandra/src/main/resources/drill-module.conf
new file mode 100755
index 0000000..d23eabc
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.cassandra"
+}
diff --git
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
new file mode 100644
index 0000000..9413db2
--- /dev/null
+++
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.github.nosan.embedded.cassandra.Cassandra;
+import com.github.nosan.embedded.cassandra.CassandraBuilder;
+import com.github.nosan.embedded.cassandra.Settings;
+import com.github.nosan.embedded.cassandra.cql.CqlScript;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.util.ComparableVersion;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.junit.Assume.assumeTrue;
+
+public class BaseCassandraTest extends ClusterTest {
+ private static Cassandra cassandra;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ assumeTrue(
+ "Skipping tests for JDK 12+ since Cassandra supports only versions up
to 11 (including).",
+ new ComparableVersion(System.getProperty("java.version"))
+ .compareTo(new ComparableVersion("12")) < 0);
+
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ startCassandra();
+
+ CassandraStorageConfig config = new CassandraStorageConfig(
+ cassandra.getSettings().getAddress().getHostAddress(),
+ cassandra.getSettings().getPort(),
+ null,
+ null);
+ config.setEnabled(true);
+ cluster.defineStoragePlugin("cassandra", config);
+
+ prepareData();
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ if (cassandra != null) {
+ cassandra.stop();
+ }
+ }
+
+ private static void startCassandra() {
+ cassandra = new CassandraBuilder().build();
+ cassandra.start();
+ }
+
+ private static void prepareData() {
+ Settings settings = cassandra.getSettings();
+
+ try (Cluster cluster = Cluster.builder()
+ .addContactPoints(settings.getAddress())
+ .withPort(settings.getPort())
+ .withoutMetrics()
+ .withoutJMXReporting()
+ .build()) {
+ Session session = cluster.connect();
+ CqlScript.ofClassPath("queries.cql").forEachStatement(session::execute);
+ }
+ }
+}
diff --git
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
new file mode 100644
index 0000000..9692369
--- /dev/null
+++
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.junit.Test;
+
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+
+public class CassandraComplexTypesTest extends BaseCassandraTest {
+
+ @Test
+ public void testSelectStarWithArray() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cassandra.test_keyspace.arr")
+ .unOrdered()
+ .baselineColumns("f_int", "string_arr", "int_arr", "int_set")
+ .baselineValues(0, listOf("a", "b", "c", "d"), listOf(1, 2, 3, 4, 0),
+ listOf(9, 8, 7, 6, 5))
+ .go();
+ }
+
+ @Test
+ public void testSelectArrayElem() throws Exception {
+ testBuilder()
+ .sqlQuery("select string_arr[0] c1, int_arr[1] c2 from
cassandra.test_keyspace.arr")
+ .unOrdered()
+ .baselineColumns("c1", "c2")
+ .baselineValues("a", 2)
+ .go();
+ }
+
+ @Test
+ public void testSelectStarWithJson() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cassandra.test_keyspace.map")
+ .unOrdered()
+ .baselineColumns("prim_field", "nest_field", "more_nest_field",
"map_arr")
+ .baselineValues(0, mapOf("a", "123", "b", "abc"),
+ mapOf("a", mapOf("b", "abc")),
+ listOf(mapOf("a", 123, "b", 321), mapOf("c", 456, "d", 789)))
+ .go();
+ }
+
+ @Test
+ public void testSelectNestedFields() throws Exception {
+ testBuilder()
+ .sqlQuery("select m.nest_field.a a, m.nest_field.b b from
cassandra.test_keyspace.map m")
+ .unOrdered()
+ .baselineColumns("a", "b")
+ .baselineValues("123", "abc")
+ .go();
+ }
+}
diff --git
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
new file mode 100644
index 0000000..0af8d4b
--- /dev/null
+++
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.junit.Test;
+
+public class CassandraPlanTest extends BaseCassandraTest {
+
+ @Test
+ public void testProjectPushDown() throws Exception {
+ queryBuilder()
+ .sql("select n_name, n_nationkey from
cassandra.test_keyspace.`nation`")
+ .planMatcher()
+ .include("cassandra=.*n_name.*n_nationkey")
+ .exclude("\\*\\*")
+ .match();
+ }
+
+ @Test
+ public void testFilterPushDown() throws Exception {
+ queryBuilder()
+ .sql("select n_name, n_nationkey from cassandra.test_keyspace.`nation`
where n_nationkey = 0")
+ .planMatcher()
+ .include("CassandraFilter")
+ .match();
+ }
+
+ @Test
+ public void testFilterPushDownWithJoin() throws Exception {
+ String query = "select * from cassandra.test_keyspace.`nation` e\n" +
+ "join cassandra.test_keyspace.`nation` s on e.n_name = s.n_name where
e.n_nationkey = 'algeria'";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("CassandraFilter")
+ .match();
+ }
+
+ @Test
+ public void testLimitPushDown() throws Exception {
+ queryBuilder()
+ .sql("select n_nationkey from cassandra.test_keyspace.`nation` limit
3")
+ .planMatcher()
+ .include("CassandraLimit.*fetch")
+ .match();
+ }
+}
diff --git
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
new file mode 100644
index 0000000..e7aecaa
--- /dev/null
+++
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class CassandraQueryTest extends BaseCassandraTest {
+
+ @Test
+ public void testSelectAll() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cassandra.test_keyspace.`employee`")
+ .unOrdered()
+ .baselineColumns("employee_id", "full_name", "first_name",
"last_name", "position_id",
+ "position_title", "store_id", "department_id", "birth_date",
"hire_date", "salary",
+ "supervisor_id", "education_level", "marital_status", "gender",
"management_role")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President",
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree",
"S", "F", "Senior Management")
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "M", "M", "Senior Management")
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "S", "M", "Senior Management")
+ .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP
Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1,
"Bachelors Degree", "M", "F", "Senior Management")
+ .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP
Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1,
"Bachelors Degree", "M", "F", "Senior Management")
+ .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP
Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1,
"Bachelors Degree", "M", "F", "Senior Management")
+ .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store
Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors
Degree", "S", "F", "Store Management")
+ .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11,
"Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5,
"Graduate Degree", "M", "F", "Store Management")
+ .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP
Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial
College", "M", "M", "Senior Management")
+ .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11,
"Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5,
"Graduate Degree", "S", "M", "Store Management")
+ .go();
+ }
+
+ @Test
+ public void testSelectColumns() throws Exception {
+ testBuilder()
+ .sqlQuery("select full_name, birth_date from
cassandra.test_keyspace.`employee`")
+ .unOrdered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .baselineValues("Derrick Whelply", "1915-07-03")
+ .baselineValues("Michael Spence", "1969-06-20")
+ .baselineValues("Maya Gutierrez", "1951-05-10")
+ .baselineValues("Roberta Damstra", "1942-10-08")
+ .baselineValues("Rebecca Kanagaki", "1949-03-27")
+ .baselineValues("Kim Brunner", "1922-08-10")
+ .baselineValues("Brenda Blumberg", "1979-06-23")
+ .baselineValues("Darren Stanz", "1949-08-26")
+ .baselineValues("Jonathan Murraiin", "1967-06-20")
+ .go();
+ }
+
+ @Test
+ public void testSelectAllFiltered() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cassandra.test_keyspace.`employee` where
employee_id = 1")
+ .unOrdered()
+ .baselineColumns("employee_id", "full_name", "first_name",
"last_name", "position_id",
+ "position_title", "store_id", "department_id", "birth_date",
"hire_date", "salary",
+ "supervisor_id", "education_level", "marital_status", "gender",
"management_role")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President",
0, 1, "1961-08-26",
+ "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F",
"Senior Management")
+ .go();
+ }
+
+ @Test
+ public void testSelectColumnsFiltered() throws Exception {
+ testBuilder()
+ .sqlQuery("select full_name, birth_date from
cassandra.test_keyspace.`employee` where employee_id = 1 and first_name =
'Sheri'")
+ .unOrdered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .go();
+ }
+
+ @Test
+ public void testSelectColumnsUnsupportedFilter() throws Exception {
+ // Calcite doesn't support LIKE and other functions yet, so ensure Drill
filter is used there
+ testBuilder()
+ .sqlQuery("select full_name, birth_date from
cassandra.test_keyspace.`employee` where first_name like 'Sh%'")
+ .unOrdered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .go();
+ }
+
+ @Test
+ public void testSelectColumnsUnsupportedProject() throws Exception {
+ // Calcite doesn't support LIKE and other functions yet, so ensure Drill
project is used there
+ testBuilder()
+ .sqlQuery("select first_name like 'Sh%' as c, birth_date from
cassandra.test_keyspace.`employee` where first_name like 'Sh%'")
+ .unOrdered()
+ .baselineColumns("c", "birth_date")
+ .baselineValues(true, "1961-08-26")
+ .go();
+ }
+
+ @Test
+ public void testSelectColumnsUnsupportedAggregate() throws Exception {
+ // Calcite doesn't support stddev_samp and other functions yet, so ensure
Drill agg is used there
+ testBuilder()
+ .sqlQuery("select stddev_samp(salary) as standard_deviation from
cassandra.test_keyspace.`employee`")
+ .unOrdered()
+ .baselineColumns("standard_deviation")
+ .baselineValues(21333.593748410563)
+ .go();
+ }
+
+ @Test
+ public void testLimitWithSort() throws Exception {
+ testBuilder()
+ .sqlQuery("select full_name, birth_date from
cassandra.test_keyspace.`employee` order by employee_id limit 3")
+ .ordered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .baselineValues("Derrick Whelply", "1915-07-03")
+ .baselineValues("Michael Spence", "1969-06-20")
+ .go();
+ }
+
+ @Test
+ public void testSelectAllWithLimitAndSort() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cassandra.test_keyspace.`employee` order by
employee_id limit 3")
+ .ordered()
+ .baselineColumns("employee_id", "full_name", "first_name",
"last_name", "position_id",
+ "position_title", "store_id", "department_id", "birth_date",
"hire_date", "salary",
+ "supervisor_id", "education_level", "marital_status", "gender",
"management_role")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President",
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree",
"S", "F", "Senior Management")
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "M", "M", "Senior Management")
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "S", "M", "Senior Management")
+ .go();
+ }
+
+ @Test
+ public void testSingleColumn() throws Exception {
+ testBuilder()
+ .sqlQuery("select full_name from cassandra.test_keyspace.`employee`")
+ .unOrdered()
+ .baselineColumns("full_name")
+ .baselineValues("Sheri Nowmer")
+ .baselineValues("Derrick Whelply")
+ .baselineValues("Michael Spence")
+ .baselineValues("Maya Gutierrez")
+ .baselineValues("Roberta Damstra")
+ .baselineValues("Rebecca Kanagaki")
+ .baselineValues("Kim Brunner")
+ .baselineValues("Brenda Blumberg")
+ .baselineValues("Darren Stanz")
+ .baselineValues("Jonathan Murraiin")
+ .go();
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ testBuilder()
+ .sqlQuery("select t1.full_name, t2.birth_date from
cassandra.test_keyspace.`employee` t1 join cassandra.test_keyspace.`employee`
t2 on t1.employee_id = t2.employee_id")
+ .unOrdered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .baselineValues("Derrick Whelply", "1915-07-03")
+ .baselineValues("Michael Spence", "1969-06-20")
+ .baselineValues("Maya Gutierrez", "1951-05-10")
+ .baselineValues("Roberta Damstra", "1942-10-08")
+ .baselineValues("Rebecca Kanagaki", "1949-03-27")
+ .baselineValues("Kim Brunner", "1922-08-10")
+ .baselineValues("Brenda Blumberg", "1979-06-23")
+ .baselineValues("Darren Stanz", "1949-08-26")
+ .baselineValues("Jonathan Murraiin", "1967-06-20")
+ .go();
+ }
+
+ @Test
+ public void testJoinWithFileTable() throws Exception {
+ testBuilder()
+ .sqlQuery("select t1.full_name, t2.birth_date from
cassandra.test_keyspace.`employee` t1 join cp.`employee.json` t2 on
t1.employee_id = t2.employee_id")
+ .unOrdered()
+ .baselineColumns("full_name", "birth_date")
+ .baselineValues("Sheri Nowmer", "1961-08-26")
+ .baselineValues("Derrick Whelply", "1915-07-03")
+ .baselineValues("Michael Spence", "1969-06-20")
+ .baselineValues("Maya Gutierrez", "1951-05-10")
+ .baselineValues("Roberta Damstra", "1942-10-08")
+ .baselineValues("Rebecca Kanagaki", "1949-03-27")
+ .baselineValues("Kim Brunner", "1922-08-10")
+ .baselineValues("Brenda Blumberg", "1979-06-23")
+ .baselineValues("Darren Stanz", "1949-08-26")
+ .baselineValues("Jonathan Murraiin", "1967-06-20")
+ .go();
+ }
+
+ @Test
+ public void testAggregate() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(*) c from cassandra.test_keyspace.`employee`")
+ .ordered()
+ .baselineColumns("c")
+ .baselineValues(10L)
+ .go();
+ }
+
+ @Test
+ public void testAggregateWithGroupBy() throws Exception {
+ testBuilder()
+ .sqlQuery("select sum(`salary`) sal, department_id from
cassandra.test_keyspace.`employee` e group by e.`department_id`")
+ .unOrdered()
+ .baselineColumns("sal", "department_id")
+ .baselineValues(195000.0, 1)
+ .baselineValues(42000.0, 11)
+ .baselineValues(25000.0, 2)
+ .baselineValues(15000.0, 3)
+ .baselineValues(50000.0, 5)
+ .go();
+ }
+
+ @Test
+ public void testSelectNonExistingColumn() throws Exception {
+ try {
+ queryBuilder().sql("select full_name123 from
cassandra.test_keyspace.`employee` order by employee_id limit 3").run();
+ fail("Query didn't fail");
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+ assertThat(e.getMessage(), containsString("Column 'full_name123' not
found in any table"));
+ }
+ }
+
+ @Test
+ public void testSelectLiterals() throws Exception {
+ testBuilder()
+ .sqlQuery("select 'abc' as full_name, 123 as id from
cassandra.test_keyspace.`employee` limit 3")
+ .unOrdered()
+ .baselineColumns("full_name", "id")
+ .baselineValues("abc", 123)
+ .baselineValues("abc", 123)
+ .baselineValues("abc", 123)
+ .go();
+ }
+
+ @Test
+ public void testSelectIntLiterals() throws Exception {
+ testBuilder()
+ .sqlQuery("select 333 as full_name, 123 as id from
cassandra.test_keyspace.`employee` limit 3")
+ .unOrdered()
+ .baselineColumns("full_name", "id")
+ .baselineValues(333, 123)
+ .baselineValues(333, 123)
+ .baselineValues(333, 123)
+ .go();
+ }
+
+ @Test
+ public void testSelectLiteralWithStar() throws Exception {
+ testBuilder()
+ .sqlQuery("select *, 123 as full_name from
cassandra.test_keyspace.`employee` order by employee_id limit 3")
+ .unOrdered()
+ .baselineColumns("full_name")
+ .baselineColumns("employee_id", "full_name", "first_name",
"last_name", "position_id",
+ "position_title", "store_id", "department_id", "birth_date",
"hire_date", "salary",
+ "supervisor_id", "education_level", "marital_status", "gender",
"management_role", "full_name0")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President",
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree",
"S", "F", "Senior Management", 123)
+ .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "M", "M", "Senior Management", 123)
+ .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1,
"Graduate Degree", "S", "M", "Senior Management", 123)
+ .go();
+ }
+
+ @Test
+ public void testLiteralWithAggregateAndGroupBy() throws Exception {
+ testBuilder()
+ .sqlQuery("select sum(`salary`) sal, 1 as department_id from
cassandra.test_keyspace.`employee` e group by e.`department_id`")
+ .unOrdered()
+ .baselineColumns("sal", "department_id")
+ .baselineValues(195000.0, 1)
+ .baselineValues(42000.0, 1)
+ .baselineValues(25000.0, 1)
+ .baselineValues(15000.0, 1)
+ .baselineValues(50000.0, 1)
+ .go();
+ }
+
+ @Test
+ public void testSelectNonExistingTable() throws Exception {
+ try {
+ queryBuilder().sql("select full_name from
cassandra.test_keyspace.`non-existing`").run();
+ fail("Query didn't fail");
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+ assertThat(e.getMessage(), containsString("Object 'non-existing' not
found within 'cassandra.test_keyspace'"));
+ }
+ }
+
+ @Test
+ public void testSelectNonExistingSubSchema() throws Exception {
+ try {
+ queryBuilder().sql("select full_name from
cassandra.test_keyspace.`non-existing`.employee").run();
+ fail("Query didn't fail");
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+ assertThat(e.getMessage(), containsString("Schema [[cassandra,
test_keyspace, non-existing]] is not valid with respect to either root schema
or current default schema"));
+ }
+ }
+
+ @Test
+ public void testWithProvidedSchema() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from " +
+
"table(cassandra.test_keyspace.`employee`(schema=>'inline=(birth_date date not
null, salary decimal(10, 2))')) " +
+ "where first_name = 'Sheri'")
+ .ordered()
+ .baselineColumns("employee_id", "full_name", "first_name",
"last_name", "position_id",
+ "position_title", "store_id", "department_id", "birth_date",
"hire_date", "salary",
+ "supervisor_id", "education_level", "marital_status", "gender",
"management_role")
+ .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President",
0, 1, LocalDate.parse("1961-08-26"),
+ "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate
Degree", "S", "F", "Senior Management")
+ .go();
+ }
+}
diff --git a/contrib/storage-cassandra/src/test/resources/queries.cql
b/contrib/storage-cassandra/src/test/resources/queries.cql
new file mode 100644
index 0000000..534a604
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/resources/queries.cql
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE KEYSPACE test_keyspace
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
+
+CREATE TABLE test_keyspace.employee (
+ employee_id bigint,
+ full_name text,
+ first_name text,
+ last_name text,
+ position_id int,
+ position_title text,
+ store_id int,
+ department_id int,
+ birth_date text,
+ hire_date text,
+ salary float,
+ supervisor_id int,
+ education_level text,
+ marital_status text,
+ gender text,
+ management_role text,
+ PRIMARY KEY (full_name, employee_id)
+) WITH CLUSTERING ORDER BY (employee_id ASC);
+
+USE test_keyspace;
+
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1,
'1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F',
'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (2, 'Derrick Whelply', 'Derrick', 'Whelply', 2, 'VP Country
Manager',0,1, '1915-07-03', '1994-12-01 00:00:00.0',40000.0 ,1, 'Graduate
Degree', 'M', 'M', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (4, 'Michael Spence', 'Michael', 'Spence', 2, 'VP Country
Manager',0,1, '1969-06-20', '1998-01-01 00:00:00.0',40000.0 ,1, 'Graduate
Degree', 'S', 'M', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (5, 'Maya Gutierrez', 'Maya', 'Gutierrez', 2, 'VP Country
Manager',0,1, '1951-05-10', '1998-01-01 00:00:00.0',35000.0 ,1, 'Bachelors
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (6, 'Roberta Damstra', 'Roberta', 'Damstra', 3, 'VP Information
Systems',0,2, '1942-10-08', '1994-12-01 00:00:00.0',25000.0 ,1, 'Bachelors
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (7, 'Rebecca Kanagaki', 'Rebecca', 'Kanagaki', 4, 'VP Human
Resources',0,3, '1949-03-27', '1994-12-01 00:00:00.0',15000.0 ,1, 'Bachelors
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (8, 'Kim Brunner', 'Kim', 'Brunner', 11, 'Store Manager',9,11,
'1922-08-10', '1998-01-01 00:00:00.0',10000.0 ,5, 'Bachelors Degree', 'S', 'F',
'Store Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (9, 'Brenda Blumberg', 'Brenda', 'Blumberg', 11, 'Store
Manager',21,11, '1979-06-23', '1998-01-01 00:00:00.0',17000.0 ,5, 'Graduate
Degree', 'M', 'F', 'Store Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (10, 'Darren Stanz', 'Darren', 'Stanz', 5, 'VP Finance',0,5,
'1949-08-26', '1994-12-01 00:00:00.0',50000.0 ,1, 'Partial College', 'M', 'M',
'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name,
position_id, position_title, store_id, department_id, birth_date, hire_date,
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (11, 'Jonathan Murraiin', 'Jonathan', 'Murraiin', 11, 'Store
Manager',1,11, '1967-06-20', '1998-01-01 00:00:00.0',15000.0 ,5, 'Graduate
Degree', 'S', 'M', 'Store Management');
+
+CREATE TABLE test_keyspace.nation (
+ n_nationkey bigint,
+ n_name text,
+ n_regionkey int,
+ PRIMARY KEY (n_nationkey, n_regionkey)
+) WITH CLUSTERING ORDER BY (n_regionkey ASC);
+
+INSERT INTO nation(n_nationkey, n_name, n_regionkey)
+VALUES (0, 'ALGERIA', 1);
+
+CREATE TABLE arr (
+ f_int int PRIMARY KEY,
+ string_arr list<text>,
+ int_arr list<int>,
+ int_set set<int>
+);
+
+INSERT INTO arr (f_int, string_arr, int_arr, int_set)
+VALUES (0, ['a', 'b', 'c', 'd'], [1, 2, 3, 4, 0], {9, 8, 7, 6, 5});
+
+CREATE TABLE map (
+ prim_field int PRIMARY KEY,
+ nest_field map<text, text>,
+ more_nest_field map<text, frozen<map<text, text>>>,
+ map_arr list<frozen<map<text, int>>>
+);
+
+INSERT INTO map (prim_field, nest_field, more_nest_field, map_arr)
+VALUES (0, {'a':'123', 'b':'abc'}, {'a':{'b':'abc'}}, [{'a':123, 'b':321},
{'c':456, 'd':789}]);
+
+CREATE TABLE tuple (
+ prim_field int PRIMARY KEY,
+ tuple_field tuple<int, text, double>
+);
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 6adce9d..46323a1 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -334,6 +334,11 @@
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage-cassandra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-storage-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/distribution/src/assemble/component.xml
b/distribution/src/assemble/component.xml
index 334ff4e..e140056 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -55,6 +55,7 @@
<include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
<include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
<include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
+ <include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
<include>org.apache.drill.contrib:drill-storage-http:jar</include>
<include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
<include>org.apache.drill.contrib:drill-udfs:jar</include>
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
index 3688972..d81db76 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -101,7 +102,14 @@ public interface ColumnConverter {
return;
}
- Iterable<?> array = (Iterable<?>) value;
+ Iterable<?> array;
+ if (value instanceof Iterable) {
+ array = (Iterable<?>) value;
+ } else if (value.getClass().isArray()) {
+ array = Arrays.asList(((Object[]) value));
+ } else {
+ throw new IllegalStateException("Invalid value type for list
ArrayColumnConverter: " + value.getClass());
+ }
array.forEach(arrayValue -> {
valueConverter.convert(arrayValue);
arrayWriter.save();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
index 43b9216..83f8f72 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
public interface EnumerablePrelContext {
- String generateCode(RelOptCluster cluster, RelNode elasticNode);
+ String generateCode(RelOptCluster cluster, RelNode relNode);
RelNode transformNode(RelNode input);
diff --git a/pom.xml b/pom.xml
index 105d509..fb0412c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
avoid_bad_dependencies plugin found in the file.
-->
<calcite.groupId>com.github.vvysotskyi.drill-calcite</calcite.groupId>
- <calcite.version>1.21.0-drill-r1</calcite.version>
+ <calcite.version>1.21.0-drill-r2</calcite.version>
<avatica.version>1.15.0</avatica.version>
<janino.version>3.0.11</janino.version>
<sqlline.version>1.9.0</sqlline.version>