This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 75e4877  DRILL-92: Cassandra storage plugin implementation based on 
Calcite adapter (#2152)
75e4877 is described below

commit 75e4877b46380188b6545b99f188b764b2a576aa
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri Jan 29 10:04:32 2021 +0200

    DRILL-92: Cassandra storage plugin implementation based on Calcite adapter 
(#2152)
---
 contrib/pom.xml                                    |   1 +
 contrib/storage-cassandra/README.md                |  47 +++
 contrib/storage-cassandra/pom.xml                  | 107 +++++++
 .../java/com/codahale/metrics/JmxReporter.java     |  66 ++++
 .../calcite/adapter/cassandra/CalciteUtils.java    |  65 ++++
 .../store/cassandra/CassandraStorageConfig.java    |  96 ++++++
 .../store/cassandra/CassandraStoragePlugin.java    |  76 +++++
 .../plan/CassandraEnumerablePrelContext.java       |  81 +++++
 .../cassandra/plan/DrillCassandraLimitRule.java    |  43 +++
 .../cassandra/schema/CassandraDrillSchema.java     |  80 +++++
 .../cassandra/schema/CassandraDynamicTable.java    |  95 ++++++
 .../cassandra/schema/CassandraRootDrillSchema.java |  83 +++++
 .../schema/CassandraRootDrillSchemaFactory.java    |  45 +++
 .../main/resources/bootstrap-storage-plugins.json  |  11 +
 .../src/main/resources/drill-module.conf           |  24 ++
 .../exec/store/cassandra/BaseCassandraTest.java    |  84 ++++++
 .../store/cassandra/CassandraComplexTypesTest.java |  69 +++++
 .../exec/store/cassandra/CassandraPlanTest.java    |  63 ++++
 .../exec/store/cassandra/CassandraQueryTest.java   | 335 +++++++++++++++++++++
 .../src/test/resources/queries.cql                 |  96 ++++++
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 .../apache/drill/exec/record/ColumnConverter.java  |  10 +-
 .../enumerable/plan/EnumerablePrelContext.java     |   2 +-
 pom.xml                                            |   2 +-
 25 files changed, 1584 insertions(+), 3 deletions(-)

diff --git a/contrib/pom.xml b/contrib/pom.xml
index 40c66f1..c94deff 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -61,6 +61,7 @@
     <module>storage-http</module>
     <module>storage-druid</module>
     <module>storage-elasticsearch</module>
+    <module>storage-cassandra</module>
   </modules>
 
 </project>
diff --git a/contrib/storage-cassandra/README.md 
b/contrib/storage-cassandra/README.md
new file mode 100644
index 0000000..07d58ab
--- /dev/null
+++ b/contrib/storage-cassandra/README.md
@@ -0,0 +1,47 @@
+# Drill Cassandra Plugin
+
+Drill Cassandra storage plugin allows you to perform SQL queries against 
Cassandra tables.
+This storage plugin implementation is based on [Apache Calcite adapter for 
Cassandra](https://calcite.apache.org/docs/cassandra_adapter.html).
+
+This storage plugin may be used for querying Scylla DB.
+
+### Supported optimizations and features
+
+This storage plugin supports the following optimizations:
+
+- Project pushdown
+- Filter pushdown (only expressions supported by Calcite adapter for Cassandra)
+- Limit pushdown
+
+Except for these optimizations, Cassandra storage plugin supports the schema 
provisioning feature.
+For more details please refer to [Specifying the Schema as Table Function 
Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter).
+
+### Plugin registration
+
+The plugin can be registered in Apache Drill using the drill web interface by 
navigating to the `storage` page.
+Following is the default registration configuration.
+
+```json
+{
+  "type" : "cassandra",
+  "host" : "localhost",
+  "port" : 9042,
+  "username" : null,
+  "password" : null,
+  "enabled": false
+}
+```
+
+### Developer notes
+
+Most of the common classes required for creating storage plugins based on 
Calcite adapters are placed in the 
+`java-exec` module, so they can be reused in future plugin implementations.
+
+Here is the list of the classes that may be useful:
+
+- `VertexDrelConverterRule` with `VertexDrel` - used to hold plugin-specific 
part of the plan at the end of the 
+  `LOGICAL` planning phase.
+- `EnumerableIntermediatePrelConverterRule` with `EnumerableIntermediatePrel` 
- the same as above, but for the 
+  `PHYSICAL` planning phase.
+- `EnumerablePrel` - responsible for generating java code that will be 
executed to query the storage plugin data source.
+- `EnumerableRecordReader` - executes java code generated in `EnumerablePrel` 
and transforms obtained results to Drill internal representation.
diff --git a/contrib/storage-cassandra/pom.xml 
b/contrib/storage-cassandra/pom.xml
new file mode 100644
index 0000000..6cc7f83
--- /dev/null
+++ b/contrib/storage-cassandra/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-cassandra</artifactId>
+
+  <name>contrib/cassandra-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-cassandra</artifactId>
+      <version>${calcite.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.datastax.cassandra</groupId>
+          <artifactId>cassandra-driver-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.scylladb</groupId>
+      <artifactId>scylla-driver-core</artifactId>
+      <version>3.10.1-scylla-0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.ow2.asm</groupId>
+      <artifactId>asm</artifactId>
+      <version>${asm.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.github.nosan</groupId>
+      <artifactId>embedded-cassandra</artifactId>
+      <version>4.0.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.hydromatic</groupId>
+      <artifactId>foodmart-data-json</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount combine.self="override">1</forkCount>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java 
b/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java
new file mode 100644
index 0000000..970ded1
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/com/codahale/metrics/JmxReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.codahale.metrics;
+
+import java.io.Closeable;
+
+/**
+ * Adapter for compatibility of metrics-jms for 3 and 4 versions.
+ */
+public class JmxReporter implements Reporter, Closeable {
+
+  private final com.codahale.metrics.jmx.JmxReporter delegate;
+
+  public JmxReporter(com.codahale.metrics.jmx.JmxReporter delegate) {
+    this.delegate = delegate;
+  }
+
+  public static Builder forRegistry(MetricRegistry registry) {
+    return new Builder(registry);
+  }
+
+  public void start() {
+    delegate.start();
+  }
+
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void close() {
+    delegate.close();
+  }
+
+  public static class Builder {
+    private final com.codahale.metrics.jmx.JmxReporter.Builder delegate;
+
+    public Builder(MetricRegistry registry) {
+      delegate = com.codahale.metrics.jmx.JmxReporter.forRegistry(registry);
+    }
+
+    public Builder inDomain(String domain) {
+      delegate.inDomain(domain);
+      return this;
+    }
+
+    public JmxReporter build() {
+      return new JmxReporter(delegate.build());
+    }
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
new file mode 100644
index 0000000..e996522
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CalciteUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import com.datastax.driver.core.Session;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+import 
org.apache.drill.exec.store.cassandra.plan.CassandraEnumerablePrelContext;
+import org.apache.drill.exec.store.cassandra.plan.DrillCassandraLimitRule;
+import org.apache.drill.exec.store.cassandra.schema.CassandraDrillSchema;
+import 
org.apache.drill.exec.store.enumerable.plan.EnumerableIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CalciteUtils {
+  private static final VertexDrelConverterRule VERTEX_DREL_CONVERTER_RULE =
+      new VertexDrelConverterRule(CassandraRel.CONVENTION);
+
+  private static final ConverterRule 
ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE =
+      new EnumerableIntermediatePrelConverterRule(
+          new CassandraEnumerablePrelContext(CassandraStorageConfig.NAME));
+
+  public static CassandraTableScan tableScanCreator(RelOptCluster cluster, 
RelTraitSet traitSet,
+      RelOptTable table, CassandraTable cassandraTable,
+      RelDataType projectRowType) {
+    return new CassandraTableScan(cluster, traitSet, table, cassandraTable, 
projectRowType);
+  }
+
+  public static Set<RelOptRule> cassandraRules() {
+    Set<RelOptRule> rules = Arrays.stream(CassandraRules.RULES)
+        .collect(Collectors.toSet());
+    rules.add(DrillCassandraLimitRule.INSTANCE);
+    rules.add(ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE);
+    rules.add(VERTEX_DREL_CONVERTER_RULE);
+    return rules;
+  }
+
+  public static Session getSession(SchemaPlus schema) {
+    return 
schema.unwrap(CassandraDrillSchema.class).getDelegatingSchema().session;
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
new file mode 100644
index 0000000..837bb58
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(CassandraStorageConfig.NAME)
+public class CassandraStorageConfig extends StoragePluginConfig {
+  public static final String NAME = "cassandra";
+
+  private final String host;
+  private final String username;
+  private final String password;
+  private final int port;
+
+  @JsonCreator
+  public CassandraStorageConfig(
+      @JsonProperty("host") String host,
+      @JsonProperty("port") int port,
+      @JsonProperty("username") String username,
+      @JsonProperty("password") String password) {
+    this.host = host;
+    this.username = username;
+    this.password = password;
+    this.port = port;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @JsonIgnore
+  public Map<String, Object> toConfigMap() {
+    Map<String, Object> result = new HashMap<>();
+
+    result.put("host", host);
+    result.put("port", port);
+    result.put("username", username);
+    result.put("password", password);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CassandraStorageConfig that = (CassandraStorageConfig) o;
+    return Objects.equals(host, that.host)
+        && Objects.equals(username, that.username)
+        && Objects.equals(password, that.password);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(host, username, password);
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
new file mode 100644
index 0000000..d5f1a8a
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.calcite.adapter.cassandra.CalciteUtils;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import 
org.apache.drill.exec.store.cassandra.schema.CassandraRootDrillSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class CassandraStoragePlugin extends AbstractStoragePlugin {
+
+  private final CassandraStorageConfig config;
+  private final SchemaFactory schemaFactory;
+
+  public CassandraStoragePlugin(
+      CassandraStorageConfig config, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = config;
+    this.schemaFactory = new CassandraRootDrillSchemaFactory(name, this);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public CassandraStorageConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext 
optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return CalciteUtils.cassandraRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return ImmutableSet.of();
+    }
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
new file mode 100644
index 0000000..06cf181
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/CassandraEnumerablePrelContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.plan;
+
+import org.apache.calcite.adapter.cassandra.CassandraToEnumerableConverterRule;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.store.SubsetRemover;
+import org.apache.drill.exec.store.enumerable.plan.EnumerablePrelContext;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class CassandraEnumerablePrelContext implements EnumerablePrelContext {
+  private final String planPrefix;
+
+  public CassandraEnumerablePrelContext(String planPrefix) {
+    this.planPrefix = planPrefix;
+  }
+
+  @Override
+  public String generateCode(RelOptCluster cluster, RelNode relNode) {
+    RelNode enumerableRel =
+        CassandraToEnumerableConverterRule.INSTANCE.convert(relNode);
+
+    ClassDeclaration classDeclaration = new 
EnumerableRelImplementor(cluster.getRexBuilder(), Collections.emptyMap())
+        .implementRoot((EnumerableRel) enumerableRel, 
EnumerableRel.Prefer.ARRAY);
+    return Expressions.toString(Collections.singletonList(classDeclaration), 
"\n", false);
+  }
+
+  @Override
+  public RelNode transformNode(RelNode input) {
+    return input.accept(SubsetRemover.INSTANCE);
+  }
+
+  @Override
+  public Map<String, Integer> getFieldsMap(RelNode transformedNode) {
+    return transformedNode.getRowType().getFieldList().stream()
+        .collect(Collectors.toMap(
+            RelDataTypeField::getName,
+            RelDataTypeField::getIndex
+        ));
+  }
+
+  @Override
+  public String getPlanPrefix() {
+    return planPrefix;
+  }
+
+  @Override
+  public String getTablePath(RelNode input) {
+    TableScan scan = Objects.requireNonNull(DrillRelOptUtil.findScan(input));
+    List<String> qualifiedName = scan.getTable().getQualifiedName();
+    return String.join(".", qualifiedName.subList(0, qualifiedName.size() - 
1));
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
new file mode 100644
index 0000000..6c4baf4
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/plan/DrillCassandraLimitRule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.plan;
+
+import org.apache.calcite.adapter.cassandra.CassandraLimit;
+import org.apache.calcite.adapter.cassandra.CassandraRel;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillRel;
+
+public class DrillCassandraLimitRule extends ConverterRule {
+  public static final DrillCassandraLimitRule INSTANCE = new 
DrillCassandraLimitRule();
+
+  private DrillCassandraLimitRule() {
+    super(DrillLimitRelBase.class, DrillRel.DRILL_LOGICAL, 
CassandraRel.CONVENTION,
+        "DrillCassandraLimitRule");
+  }
+
+  public RelNode convert(RelNode relNode) {
+    DrillLimitRelBase limit = (DrillLimitRelBase) relNode;
+    final RelTraitSet traitSet =
+        limit.getTraitSet().replace(CassandraRel.CONVENTION);
+    return new CassandraLimit(limit.getCluster(), traitSet,
+        convert(limit.getInput(), CassandraRel.CONVENTION), limit.getOffset(), 
limit.getFetch());
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
new file mode 100644
index 0000000..eb8796f
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDrillSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.cassandra.CassandraSchema;
+import org.apache.calcite.adapter.cassandra.CassandraTable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CassandraDrillSchema extends AbstractSchema {
+  private final CassandraSchema delegatingSchema;
+  private final StoragePlugin plugin;
+  private final Map<String, Table> tables = new ConcurrentHashMap<>();
+
+  public CassandraDrillSchema(String name, StoragePlugin plugin, 
CassandraSchema delegatingSchema) {
+    super(Collections.emptyList(), name);
+    this.plugin = plugin;
+    this.delegatingSchema = delegatingSchema;
+  }
+
+  @Override
+  public String getTypeName() {
+    return CassandraStorageConfig.NAME;
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+    return tables.computeIfAbsent(tableName, this::getDrillTable);
+  }
+
+  private DrillTable getDrillTable(String tableName) {
+    CassandraTable table = (CassandraTable) 
delegatingSchema.getTable(tableName);
+    return table == null ? null
+        : new CassandraDynamicTable(plugin, tableName, null, table);
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return delegatingSchema.getTableNames();
+  }
+
+  @Override
+  public Expression getExpression(SchemaPlus parentSchema, String name) {
+    return Expressions.call(
+        DataContext.ROOT,
+        BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+  }
+
+  public CassandraSchema getDelegatingSchema() {
+    return delegatingSchema;
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
new file mode 100644
index 0000000..8b1d58d
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraDynamicTable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.adapter.cassandra.CassandraRel;
+import org.apache.calcite.adapter.cassandra.CassandraTable;
+import org.apache.calcite.adapter.cassandra.CalciteUtils;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+public class CassandraDynamicTable extends DrillTable implements 
TranslatableTable, QueryableTable, Wrapper {
+
+  private final CassandraTable table;
+
+  public CassandraDynamicTable(StoragePlugin plugin, String storageEngineName, 
Object selection, CassandraTable table) {
+    super(storageEngineName, plugin, selection);
+    this.table = table;
+  }
+
+  @Override
+  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
+    RelOptCluster cluster = context.getCluster();
+    return CalciteUtils.tableScanCreator(cluster, 
cluster.traitSetOf(CassandraRel.CONVENTION),
+        relOptTable, table, relOptTable.getRowType());
+  }
+
+  @Override
+  public <V> Queryable<V> asQueryable(QueryProvider queryProvider, SchemaPlus 
schema, String tableName) {
+    CassandraTable cassandraTable = CassandraDynamicTable.this.table;
+    return new CassandraTable.CassandraQueryable<V>(queryProvider, schema, 
cassandraTable, tableName) {
+      public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
+          List<Map.Entry<String, String>> selectFields, List<String> 
predicates,
+          List<String> order, Integer offset, Integer fetch) {
+        return cassandraTable.query(CalciteUtils.getSession(schema), fields, 
selectFields, predicates,
+            order, offset, fetch);
+      }
+    };
+  }
+
+  @Override
+  public Type getElementType() {
+    return table.getElementType();
+  }
+
+  public <C> C unwrap(Class<C> aClass) {
+    if (aClass.isInstance(this)) {
+      return aClass.cast(this);
+    } else if (aClass.isInstance(table)) {
+      return aClass.cast(table);
+    }
+    return null;
+  }
+
+  @Override
+  public Expression getExpression(SchemaPlus schema, String tableName, Class 
clazz) {
+    return table.getExpression(schema, tableName, clazz);
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return table.getRowType(typeFactory);
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
new file mode 100644
index 0000000..513f7ae
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchema.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.cassandra.CassandraSchema;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.cassandra.CassandraStorageConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CassandraRootDrillSchema extends AbstractSchema {
+  private final Map<String, Schema> schemas = new ConcurrentHashMap<>();
+
+  private final StoragePlugin plugin;
+  private final SchemaFactory schemaFactory;
+  private final SchemaPlus parent;
+  private final String parentName;
+  private final Map<String, Object> configMap;
+
+  public CassandraRootDrillSchema(String name, StoragePlugin plugin, 
SchemaFactory schemaFactory,
+      SchemaPlus parent, String parentName, Map<String, Object> configMap) {
+    super(Collections.emptyList(), name);
+    this.plugin = plugin;
+    this.schemaFactory = schemaFactory;
+    this.parent = parent;
+    this.parentName = parentName;
+    this.configMap = configMap;
+  }
+
+  @Override
+  public String getTypeName() {
+    return CassandraStorageConfig.NAME;
+  }
+
+  @Override
+  public Schema getSubSchema(String name) {
+    return schemas.computeIfAbsent(name, this::createSubSchema);
+  }
+
+  private Schema createSubSchema(String schemaName) {
+    Map<String, Object> configs = new HashMap<>(configMap);
+    configs.put("keyspace", schemaName);
+
+    SchemaPlus parentSchema = parent.getSubSchema(parentName);
+    Schema schema = new CassandraDrillSchema(schemaName, plugin,
+        (CassandraSchema) schemaFactory.create(parentSchema, schemaName, 
configs));
+    parentSchema.add(schemaName, schema);
+    return schema;
+  }
+
+  @Override
+  public Expression getExpression(SchemaPlus parentSchema, String name) {
+    return Expressions.call(
+        DataContext.ROOT,
+        BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
new file mode 100644
index 0000000..edcf72b
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra.schema;
+
+import org.apache.calcite.adapter.cassandra.CassandraSchemaFactory;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin;
+
+public class CassandraRootDrillSchemaFactory extends AbstractSchemaFactory {
+
+  private final CassandraStoragePlugin plugin;
+  private final SchemaFactory calciteSchemaFactory;
+
+  public CassandraRootDrillSchemaFactory(String name, CassandraStoragePlugin 
plugin) {
+    super(name);
+    this.plugin = plugin;
+    this.calciteSchemaFactory = new CassandraSchemaFactory();
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    Schema schema = new CassandraRootDrillSchema(getName(), plugin,
+        calciteSchemaFactory, parent, getName(), 
plugin.getConfig().toConfigMap());
+    parent.add(getName(), schema);
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json 
b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..f433910
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+  "storage":{
+    "cassandra" : {
+      "type" : "cassandra",
+      "host" : "localhost",
+      "port" : 9042,
+      "username" : null,
+      "password" : null
+    }
+  }
+}
diff --git a/contrib/storage-cassandra/src/main/resources/drill-module.conf 
b/contrib/storage-cassandra/src/main/resources/drill-module.conf
new file mode 100755
index 0000000..d23eabc
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.cassandra"
+}
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
new file mode 100644
index 0000000..9413db2
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.github.nosan.embedded.cassandra.Cassandra;
+import com.github.nosan.embedded.cassandra.CassandraBuilder;
+import com.github.nosan.embedded.cassandra.Settings;
+import com.github.nosan.embedded.cassandra.cql.CqlScript;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.util.ComparableVersion;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.junit.Assume.assumeTrue;
+
+public class BaseCassandraTest extends ClusterTest {
+  private static Cassandra cassandra;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    assumeTrue(
+        "Skipping tests for JDK 12+ since Cassandra supports only versions up 
to 11 (including).",
+        new ComparableVersion(System.getProperty("java.version"))
+            .compareTo(new ComparableVersion("12")) < 0);
+
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    startCassandra();
+
+    CassandraStorageConfig config = new CassandraStorageConfig(
+        cassandra.getSettings().getAddress().getHostAddress(),
+        cassandra.getSettings().getPort(),
+        null,
+        null);
+    config.setEnabled(true);
+    cluster.defineStoragePlugin("cassandra", config);
+
+    prepareData();
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    if (cassandra != null) {
+      cassandra.stop();
+    }
+  }
+
+  private static void startCassandra() {
+    cassandra = new CassandraBuilder().build();
+    cassandra.start();
+  }
+
+  private static void prepareData() {
+    Settings settings = cassandra.getSettings();
+
+    try (Cluster cluster = Cluster.builder()
+        .addContactPoints(settings.getAddress())
+        .withPort(settings.getPort())
+        .withoutMetrics()
+        .withoutJMXReporting()
+        .build()) {
+      Session session = cluster.connect();
+      CqlScript.ofClassPath("queries.cql").forEachStatement(session::execute);
+    }
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
new file mode 100644
index 0000000..9692369
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraComplexTypesTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.junit.Test;
+
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+
+public class CassandraComplexTypesTest extends BaseCassandraTest {
+
+  @Test
+  public void testSelectStarWithArray() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cassandra.test_keyspace.arr")
+        .unOrdered()
+        .baselineColumns("f_int", "string_arr", "int_arr", "int_set")
+        .baselineValues(0, listOf("a", "b", "c", "d"), listOf(1, 2, 3, 4, 0),
+            listOf(9, 8, 7, 6, 5))
+        .go();
+  }
+
+  @Test
+  public void testSelectArrayElem() throws Exception {
+    testBuilder()
+        .sqlQuery("select string_arr[0] c1, int_arr[1] c2 from 
cassandra.test_keyspace.arr")
+        .unOrdered()
+        .baselineColumns("c1", "c2")
+        .baselineValues("a", 2)
+        .go();
+  }
+
+  @Test
+  public void testSelectStarWithJson() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cassandra.test_keyspace.map")
+        .unOrdered()
+        .baselineColumns("prim_field", "nest_field", "more_nest_field", 
"map_arr")
+        .baselineValues(0, mapOf("a", "123", "b", "abc"),
+            mapOf("a", mapOf("b", "abc")),
+            listOf(mapOf("a", 123, "b", 321), mapOf("c", 456, "d", 789)))
+        .go();
+  }
+
+  @Test
+  public void testSelectNestedFields() throws Exception {
+    testBuilder()
+        .sqlQuery("select m.nest_field.a a, m.nest_field.b b from 
cassandra.test_keyspace.map m")
+        .unOrdered()
+        .baselineColumns("a", "b")
+        .baselineValues("123", "abc")
+        .go();
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
new file mode 100644
index 0000000..0af8d4b
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraPlanTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.junit.Test;
+
+public class CassandraPlanTest extends BaseCassandraTest {
+
+  @Test
+  public void testProjectPushDown() throws Exception {
+    queryBuilder()
+        .sql("select n_name, n_nationkey from 
cassandra.test_keyspace.`nation`")
+        .planMatcher()
+        .include("cassandra=.*n_name.*n_nationkey")
+        .exclude("\\*\\*")
+        .match();
+  }
+
+  @Test
+  public void testFilterPushDown() throws Exception {
+    queryBuilder()
+        .sql("select n_name, n_nationkey from cassandra.test_keyspace.`nation` 
where n_nationkey = 0")
+        .planMatcher()
+        .include("CassandraFilter")
+        .match();
+  }
+
+  @Test
+  public void testFilterPushDownWithJoin() throws Exception {
+    String query = "select * from cassandra.test_keyspace.`nation` e\n" +
+        "join cassandra.test_keyspace.`nation` s on e.n_name = s.n_name where 
e.n_nationkey = 'algeria'";
+
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("CassandraFilter")
+        .match();
+  }
+
+  @Test
+  public void testLimitPushDown() throws Exception {
+    queryBuilder()
+        .sql("select n_nationkey from cassandra.test_keyspace.`nation` limit 
3")
+        .planMatcher()
+        .include("CassandraLimit.*fetch")
+        .match();
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
new file mode 100644
index 0000000..e7aecaa
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraQueryTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class CassandraQueryTest extends BaseCassandraTest {
+
+  @Test
+  public void testSelectAll() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cassandra.test_keyspace.`employee`")
+        .unOrdered()
+        .baselineColumns("employee_id", "full_name", "first_name", 
"last_name", "position_id",
+            "position_title", "store_id", "department_id", "birth_date", 
"hire_date", "salary",
+            "supervisor_id", "education_level", "marital_status", "gender", 
"management_role")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", 
"S", "F", "Senior Management")
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP 
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "M", "M", "Senior Management")
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP 
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "S", "M", "Senior Management")
+        .baselineValues(5L, "Maya Gutierrez", "Maya", "Gutierrez", 2, "VP 
Country Manager", 0, 1, "1951-05-10", "1998-01-01 00:00:00.0", 35000.0f, 1, 
"Bachelors Degree", "M", "F", "Senior Management")
+        .baselineValues(6L, "Roberta Damstra", "Roberta", "Damstra", 3, "VP 
Information Systems", 0, 2, "1942-10-08", "1994-12-01 00:00:00.0", 25000.0f, 1, 
"Bachelors Degree", "M", "F", "Senior Management")
+        .baselineValues(7L, "Rebecca Kanagaki", "Rebecca", "Kanagaki", 4, "VP 
Human Resources", 0, 3, "1949-03-27", "1994-12-01 00:00:00.0", 15000.0f, 1, 
"Bachelors Degree", "M", "F", "Senior Management")
+        .baselineValues(8L, "Kim Brunner", "Kim", "Brunner", 11, "Store 
Manager", 9, 11, "1922-08-10", "1998-01-01 00:00:00.0", 10000.0f, 5, "Bachelors 
Degree", "S", "F", "Store Management")
+        .baselineValues(9L, "Brenda Blumberg", "Brenda", "Blumberg", 11, 
"Store Manager", 21, 11, "1979-06-23", "1998-01-01 00:00:00.0", 17000.0f, 5, 
"Graduate Degree", "M", "F", "Store Management")
+        .baselineValues(10L, "Darren Stanz", "Darren", "Stanz", 5, "VP 
Finance", 0, 5, "1949-08-26", "1994-12-01 00:00:00.0", 50000.0f, 1, "Partial 
College", "M", "M", "Senior Management")
+        .baselineValues(11L, "Jonathan Murraiin", "Jonathan", "Murraiin", 11, 
"Store Manager", 1, 11, "1967-06-20", "1998-01-01 00:00:00.0", 15000.0f, 5, 
"Graduate Degree", "S", "M", "Store Management")
+        .go();
+  }
+
+  @Test
+  public void testSelectColumns() throws Exception {
+    testBuilder()
+        .sqlQuery("select full_name, birth_date from 
cassandra.test_keyspace.`employee`")
+        .unOrdered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .baselineValues("Derrick Whelply", "1915-07-03")
+        .baselineValues("Michael Spence", "1969-06-20")
+        .baselineValues("Maya Gutierrez", "1951-05-10")
+        .baselineValues("Roberta Damstra", "1942-10-08")
+        .baselineValues("Rebecca Kanagaki", "1949-03-27")
+        .baselineValues("Kim Brunner", "1922-08-10")
+        .baselineValues("Brenda Blumberg", "1979-06-23")
+        .baselineValues("Darren Stanz", "1949-08-26")
+        .baselineValues("Jonathan Murraiin", "1967-06-20")
+        .go();
+  }
+
+  @Test
+  public void testSelectAllFiltered() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cassandra.test_keyspace.`employee` where 
employee_id = 1")
+        .unOrdered()
+        .baselineColumns("employee_id", "full_name", "first_name", 
"last_name", "position_id",
+            "position_title", "store_id", "department_id", "birth_date", 
"hire_date", "salary",
+            "supervisor_id", "education_level", "marital_status", "gender", 
"management_role")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 
0, 1, "1961-08-26",
+            "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", "S", "F", 
"Senior Management")
+        .go();
+  }
+
+  @Test
+  public void testSelectColumnsFiltered() throws Exception {
+    testBuilder()
+        .sqlQuery("select full_name, birth_date from 
cassandra.test_keyspace.`employee` where employee_id = 1 and first_name = 
'Sheri'")
+        .unOrdered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .go();
+  }
+
+  @Test
+  public void testSelectColumnsUnsupportedFilter() throws Exception {
+    // Calcite doesn't support LIKE and other functions yet, so ensure Drill 
filter is used there
+    testBuilder()
+        .sqlQuery("select full_name, birth_date from 
cassandra.test_keyspace.`employee` where first_name like 'Sh%'")
+        .unOrdered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .go();
+  }
+
+  @Test
+  public void testSelectColumnsUnsupportedProject() throws Exception {
+    // Calcite doesn't support LIKE and other functions yet, so ensure Drill 
project is used there
+    testBuilder()
+        .sqlQuery("select first_name like 'Sh%' as c, birth_date from 
cassandra.test_keyspace.`employee` where first_name like 'Sh%'")
+        .unOrdered()
+        .baselineColumns("c", "birth_date")
+        .baselineValues(true, "1961-08-26")
+        .go();
+  }
+
+  @Test
+  public void testSelectColumnsUnsupportedAggregate() throws Exception {
+    // Calcite doesn't support stddev_samp and other functions yet, so ensure 
Drill agg is used there
+    testBuilder()
+        .sqlQuery("select stddev_samp(salary) as standard_deviation from 
cassandra.test_keyspace.`employee`")
+        .unOrdered()
+        .baselineColumns("standard_deviation")
+        .baselineValues(21333.593748410563)
+        .go();
+  }
+
+  @Test
+  public void testLimitWithSort() throws Exception {
+    testBuilder()
+        .sqlQuery("select full_name, birth_date from 
cassandra.test_keyspace.`employee` order by employee_id limit 3")
+        .ordered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .baselineValues("Derrick Whelply", "1915-07-03")
+        .baselineValues("Michael Spence", "1969-06-20")
+        .go();
+  }
+
+  @Test
+  public void testSelectAllWithLimitAndSort() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cassandra.test_keyspace.`employee` order by 
employee_id limit 3")
+        .ordered()
+        .baselineColumns("employee_id", "full_name", "first_name", 
"last_name", "position_id",
+            "position_title", "store_id", "department_id", "birth_date", 
"hire_date", "salary",
+            "supervisor_id", "education_level", "marital_status", "gender", 
"management_role")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", 
"S", "F", "Senior Management")
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP 
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "M", "M", "Senior Management")
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP 
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "S", "M", "Senior Management")
+        .go();
+  }
+
+  @Test
+  public void testSingleColumn() throws Exception {
+    testBuilder()
+        .sqlQuery("select full_name from cassandra.test_keyspace.`employee`")
+        .unOrdered()
+        .baselineColumns("full_name")
+        .baselineValues("Sheri Nowmer")
+        .baselineValues("Derrick Whelply")
+        .baselineValues("Michael Spence")
+        .baselineValues("Maya Gutierrez")
+        .baselineValues("Roberta Damstra")
+        .baselineValues("Rebecca Kanagaki")
+        .baselineValues("Kim Brunner")
+        .baselineValues("Brenda Blumberg")
+        .baselineValues("Darren Stanz")
+        .baselineValues("Jonathan Murraiin")
+        .go();
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    testBuilder()
+        .sqlQuery("select t1.full_name, t2.birth_date from 
cassandra.test_keyspace.`employee` t1 join cassandra.test_keyspace.`employee` 
t2 on t1.employee_id = t2.employee_id")
+        .unOrdered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .baselineValues("Derrick Whelply", "1915-07-03")
+        .baselineValues("Michael Spence", "1969-06-20")
+        .baselineValues("Maya Gutierrez", "1951-05-10")
+        .baselineValues("Roberta Damstra", "1942-10-08")
+        .baselineValues("Rebecca Kanagaki", "1949-03-27")
+        .baselineValues("Kim Brunner", "1922-08-10")
+        .baselineValues("Brenda Blumberg", "1979-06-23")
+        .baselineValues("Darren Stanz", "1949-08-26")
+        .baselineValues("Jonathan Murraiin", "1967-06-20")
+        .go();
+  }
+
+  @Test
+  public void testJoinWithFileTable() throws Exception {
+    testBuilder()
+        .sqlQuery("select t1.full_name, t2.birth_date from 
cassandra.test_keyspace.`employee` t1 join cp.`employee.json` t2 on 
t1.employee_id = t2.employee_id")
+        .unOrdered()
+        .baselineColumns("full_name", "birth_date")
+        .baselineValues("Sheri Nowmer", "1961-08-26")
+        .baselineValues("Derrick Whelply", "1915-07-03")
+        .baselineValues("Michael Spence", "1969-06-20")
+        .baselineValues("Maya Gutierrez", "1951-05-10")
+        .baselineValues("Roberta Damstra", "1942-10-08")
+        .baselineValues("Rebecca Kanagaki", "1949-03-27")
+        .baselineValues("Kim Brunner", "1922-08-10")
+        .baselineValues("Brenda Blumberg", "1979-06-23")
+        .baselineValues("Darren Stanz", "1949-08-26")
+        .baselineValues("Jonathan Murraiin", "1967-06-20")
+        .go();
+  }
+
+  @Test
+  public void testAggregate() throws Exception {
+    testBuilder()
+        .sqlQuery("select count(*) c from cassandra.test_keyspace.`employee`")
+        .ordered()
+        .baselineColumns("c")
+        .baselineValues(10L)
+        .go();
+  }
+
+  @Test
+  public void testAggregateWithGroupBy() throws Exception {
+    testBuilder()
+        .sqlQuery("select sum(`salary`) sal, department_id from 
cassandra.test_keyspace.`employee` e group by e.`department_id`")
+        .unOrdered()
+        .baselineColumns("sal", "department_id")
+        .baselineValues(195000.0, 1)
+        .baselineValues(42000.0, 11)
+        .baselineValues(25000.0, 2)
+        .baselineValues(15000.0, 3)
+        .baselineValues(50000.0, 5)
+        .go();
+  }
+
+   @Test
+    public void testSelectNonExistingColumn() throws Exception {
+      try {
+        queryBuilder().sql("select full_name123 from 
cassandra.test_keyspace.`employee` order by employee_id limit 3").run();
+        fail("Query didn't fail");
+      } catch (UserRemoteException e) {
+        assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+        assertThat(e.getMessage(), containsString("Column 'full_name123' not 
found in any table"));
+      }
+    }
+
+  @Test
+  public void testSelectLiterals() throws Exception {
+    testBuilder()
+        .sqlQuery("select 'abc' as full_name, 123 as id from 
cassandra.test_keyspace.`employee` limit 3")
+        .unOrdered()
+        .baselineColumns("full_name", "id")
+        .baselineValues("abc", 123)
+        .baselineValues("abc", 123)
+        .baselineValues("abc", 123)
+        .go();
+  }
+
+  @Test
+  public void testSelectIntLiterals() throws Exception {
+    testBuilder()
+        .sqlQuery("select 333 as full_name, 123 as id from 
cassandra.test_keyspace.`employee` limit 3")
+        .unOrdered()
+        .baselineColumns("full_name", "id")
+        .baselineValues(333, 123)
+        .baselineValues(333, 123)
+        .baselineValues(333, 123)
+        .go();
+  }
+
+  @Test
+  public void testSelectLiteralWithStar() throws Exception {
+    testBuilder()
+        .sqlQuery("select *, 123 as full_name from 
cassandra.test_keyspace.`employee` order by employee_id limit 3")
+        .unOrdered()
+        .baselineColumns("full_name")
+        .baselineColumns("employee_id", "full_name", "first_name", 
"last_name", "position_id",
+            "position_title", "store_id", "department_id", "birth_date", 
"hire_date", "salary",
+            "supervisor_id", "education_level", "marital_status", "gender", 
"management_role", "full_name0")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 
0, 1, "1961-08-26", "1994-12-01 00:00:00.0", 80000.0f, 0, "Graduate Degree", 
"S", "F", "Senior Management", 123)
+        .baselineValues(2L, "Derrick Whelply", "Derrick", "Whelply", 2, "VP 
Country Manager", 0, 1, "1915-07-03", "1994-12-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "M", "M", "Senior Management", 123)
+        .baselineValues(4L, "Michael Spence", "Michael", "Spence", 2, "VP 
Country Manager", 0, 1, "1969-06-20", "1998-01-01 00:00:00.0", 40000.0f, 1, 
"Graduate Degree", "S", "M", "Senior Management", 123)
+        .go();
+  }
+
+  @Test
+  public void testLiteralWithAggregateAndGroupBy() throws Exception {
+    testBuilder()
+        .sqlQuery("select sum(`salary`) sal, 1 as department_id from 
cassandra.test_keyspace.`employee` e group by e.`department_id`")
+        .unOrdered()
+        .baselineColumns("sal", "department_id")
+        .baselineValues(195000.0, 1)
+        .baselineValues(42000.0, 1)
+        .baselineValues(25000.0, 1)
+        .baselineValues(15000.0, 1)
+        .baselineValues(50000.0, 1)
+        .go();
+  }
+
+  @Test
+  public void testSelectNonExistingTable() throws Exception {
+    try {
+      queryBuilder().sql("select full_name from 
cassandra.test_keyspace.`non-existing`").run();
+      fail("Query didn't fail");
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+      assertThat(e.getMessage(), containsString("Object 'non-existing' not 
found within 'cassandra.test_keyspace'"));
+    }
+  }
+
+  @Test
+  public void testSelectNonExistingSubSchema() throws Exception {
+    try {
+      queryBuilder().sql("select full_name from 
cassandra.test_keyspace.`non-existing`.employee").run();
+      fail("Query didn't fail");
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString("VALIDATION ERROR"));
+      assertThat(e.getMessage(), containsString("Schema [[cassandra, 
test_keyspace, non-existing]] is not valid with respect to either root schema 
or current default schema"));
+    }
+  }
+
+  @Test
+  public void testWithProvidedSchema() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from " +
+                
"table(cassandra.test_keyspace.`employee`(schema=>'inline=(birth_date date not 
null, salary decimal(10, 2))')) " +
+                "where first_name = 'Sheri'")
+        .ordered()
+        .baselineColumns("employee_id", "full_name", "first_name", 
"last_name", "position_id",
+            "position_title", "store_id", "department_id", "birth_date", 
"hire_date", "salary",
+            "supervisor_id", "education_level", "marital_status", "gender", 
"management_role")
+        .baselineValues(1L, "Sheri Nowmer", "Sheri", "Nowmer", 1, "President", 
0, 1, LocalDate.parse("1961-08-26"),
+            "1994-12-01 00:00:00.0", new BigDecimal("80000.00"), 0, "Graduate 
Degree", "S", "F", "Senior Management")
+        .go();
+  }
+}
diff --git a/contrib/storage-cassandra/src/test/resources/queries.cql 
b/contrib/storage-cassandra/src/test/resources/queries.cql
new file mode 100644
index 0000000..534a604
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/resources/queries.cql
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE KEYSPACE test_keyspace
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
+
+CREATE TABLE test_keyspace.employee (
+    employee_id bigint,
+    full_name text,
+    first_name text,
+    last_name text,
+    position_id int,
+    position_title text,
+    store_id int,
+    department_id int,
+    birth_date text,
+    hire_date text,
+    salary float,
+    supervisor_id int,
+    education_level text,
+    marital_status text,
+    gender text,
+    management_role text,
+    PRIMARY KEY (full_name, employee_id)
+) WITH CLUSTERING ORDER BY (employee_id ASC);
+
+USE test_keyspace;
+
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (1, 'Sheri Nowmer', 'Sheri', 'Nowmer', 1, 'President',0,1, 
'1961-08-26', '1994-12-01 00:00:00.0',80000.0 ,0, 'Graduate Degree', 'S', 'F', 
'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (2, 'Derrick Whelply', 'Derrick', 'Whelply', 2, 'VP Country 
Manager',0,1, '1915-07-03', '1994-12-01 00:00:00.0',40000.0 ,1, 'Graduate 
Degree', 'M', 'M', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (4, 'Michael Spence', 'Michael', 'Spence', 2, 'VP Country 
Manager',0,1, '1969-06-20', '1998-01-01 00:00:00.0',40000.0 ,1, 'Graduate 
Degree', 'S', 'M', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (5, 'Maya Gutierrez', 'Maya', 'Gutierrez', 2, 'VP Country 
Manager',0,1, '1951-05-10', '1998-01-01 00:00:00.0',35000.0 ,1, 'Bachelors 
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (6, 'Roberta Damstra', 'Roberta', 'Damstra', 3, 'VP Information 
Systems',0,2, '1942-10-08', '1994-12-01 00:00:00.0',25000.0 ,1, 'Bachelors 
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (7, 'Rebecca Kanagaki', 'Rebecca', 'Kanagaki', 4, 'VP Human 
Resources',0,3, '1949-03-27', '1994-12-01 00:00:00.0',15000.0 ,1, 'Bachelors 
Degree', 'M', 'F', 'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (8, 'Kim Brunner', 'Kim', 'Brunner', 11, 'Store Manager',9,11, 
'1922-08-10', '1998-01-01 00:00:00.0',10000.0 ,5, 'Bachelors Degree', 'S', 'F', 
'Store Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (9, 'Brenda Blumberg', 'Brenda', 'Blumberg', 11, 'Store 
Manager',21,11, '1979-06-23', '1998-01-01 00:00:00.0',17000.0 ,5, 'Graduate 
Degree', 'M', 'F', 'Store Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (10, 'Darren Stanz', 'Darren', 'Stanz', 5, 'VP Finance',0,5, 
'1949-08-26', '1994-12-01 00:00:00.0',50000.0 ,1, 'Partial College', 'M', 'M', 
'Senior Management');
+INSERT INTO employee(employee_id, full_name, first_name, last_name, 
position_id, position_title, store_id, department_id, birth_date, hire_date, 
salary, supervisor_id, education_level, marital_status, gender, management_role)
+ VALUES (11, 'Jonathan Murraiin', 'Jonathan', 'Murraiin', 11, 'Store 
Manager',1,11, '1967-06-20', '1998-01-01 00:00:00.0',15000.0 ,5, 'Graduate 
Degree', 'S', 'M', 'Store Management');
+
+CREATE TABLE test_keyspace.nation (
+    n_nationkey bigint,
+    n_name text,
+    n_regionkey int,
+    PRIMARY KEY (n_nationkey, n_regionkey)
+) WITH CLUSTERING ORDER BY (n_regionkey ASC);
+
+INSERT INTO nation(n_nationkey, n_name, n_regionkey)
+VALUES (0, 'ALGERIA', 1);
+
+CREATE TABLE arr (
+    f_int int PRIMARY KEY,
+    string_arr list<text>,
+    int_arr list<int>,
+    int_set set<int>
+);
+
+INSERT INTO arr (f_int, string_arr, int_arr, int_set)
+VALUES (0, ['a', 'b', 'c', 'd'], [1, 2, 3, 4, 0], {9, 8, 7, 6, 5});
+
+CREATE TABLE map (
+    prim_field int PRIMARY KEY,
+    nest_field map<text, text>,
+    more_nest_field map<text, frozen<map<text, text>>>,
+    map_arr list<frozen<map<text, int>>>
+);
+
+INSERT INTO map (prim_field, nest_field, more_nest_field, map_arr)
+VALUES (0, {'a':'123', 'b':'abc'}, {'a':{'b':'abc'}}, [{'a':123, 'b':321}, 
{'c':456, 'd':789}]);
+
+CREATE TABLE tuple (
+    prim_field int PRIMARY KEY,
+    tuple_field tuple<int, text, double>
+);
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 6adce9d..46323a1 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -334,6 +334,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage-cassandra</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-storage-elasticsearch</artifactId>
           <version>${project.version}</version>
         </dependency>
diff --git a/distribution/src/assemble/component.xml 
b/distribution/src/assemble/component.xml
index 334ff4e..e140056 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -55,6 +55,7 @@
         <include>org.apache.drill.contrib:drill-kudu-storage:jar</include>
         <include>org.apache.drill.contrib:drill-storage-kafka:jar</include>
         
<include>org.apache.drill.contrib:drill-storage-elasticsearch:jar</include>
+        <include>org.apache.drill.contrib:drill-storage-cassandra:jar</include>
         <include>org.apache.drill.contrib:drill-storage-http:jar</include>
         <include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
         <include>org.apache.drill.contrib:drill-udfs:jar</include>
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
index 3688972..d81db76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,7 +102,14 @@ public interface ColumnConverter {
         return;
       }
 
-      Iterable<?> array = (Iterable<?>) value;
+      Iterable<?> array;
+      if (value instanceof Iterable) {
+        array = (Iterable<?>) value;
+      } else if (value.getClass().isArray()) {
+        array = Arrays.asList(((Object[]) value));
+      } else {
+        throw new IllegalStateException("Invalid value type for list 
ArrayColumnConverter: " + value.getClass());
+      }
       array.forEach(arrayValue -> {
         valueConverter.convert(arrayValue);
         arrayWriter.save();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
index 43b9216..83f8f72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrelContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
 
 public interface EnumerablePrelContext {
 
-  String generateCode(RelOptCluster cluster, RelNode elasticNode);
+  String generateCode(RelOptCluster cluster, RelNode relNode);
 
   RelNode transformNode(RelNode input);
 
diff --git a/pom.xml b/pom.xml
index 105d509..fb0412c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
       avoid_bad_dependencies plugin found in the file.
     -->
     <calcite.groupId>com.github.vvysotskyi.drill-calcite</calcite.groupId>
-    <calcite.version>1.21.0-drill-r1</calcite.version>
+    <calcite.version>1.21.0-drill-r2</calcite.version>
     <avatica.version>1.15.0</avatica.version>
     <janino.version>3.0.11</janino.version>
     <sqlline.version>1.9.0</sqlline.version>

Reply via email to