[ 
https://issues.apache.org/jira/browse/DRILL-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263756#comment-17263756
 ] 

ASF GitHub Bot commented on DRILL-3637:
---------------------------------------

vvysotskyi commented on a change in pull request #2135:
URL: https://github.com/apache/drill/pull/2135#discussion_r556107998



##########
File path: contrib/storage-elasticsearch/pom.xml
##########
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <properties>
+    <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
+  </properties>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-elasticsearch</artifactId>
+
+  <name>contrib/elasticsearch-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-elasticsearch</artifactId>
+      <version>${calcite.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion></exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <version>7.0.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.hydromatic</groupId>
+      <artifactId>foodmart-data-json</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount combine.self="override">1</forkCount>

Review comment:
       Elastic tests are running using the real elasticsearch instance started 
by the `elasticsearch-maven-plugin` plugin. Since every test class required 
preparing its data, running tests concurrently may cause test data corruption 
and therefore random failures, so they are running in a single thread.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(ElasticsearchStorageConfig.NAME)
+public class ElasticsearchStorageConfig extends StoragePluginConfig {
+  public static final String NAME = "elastic";
+
+  private static final ObjectWriter OBJECT_WRITER = new 
ObjectMapper().writerFor(List.class);
+
+  private final List<String> hosts;
+  private final String username;
+  private final String password;
+
+  @JsonCreator

Review comment:
       We should satisfy prerequisites to be able to do that. From the article:
   
   > The requirements for that are :
   > 
   > 1. JDK 1.8
   > 2. compile with -parameters argument
   > 3. use and register jackson-module-parameter-names
   
   Currently, we have satisfied only the first one...

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/CalciteUtils.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.store.elasticsearch.ElasticsearchStorageConfig;
+import 
org.apache.drill.exec.store.elasticsearch.plan.ElasticSearchEnumerablePrelContext;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchFilterRule;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchProjectRule;
+import 
org.apache.drill.exec.store.enumerable.plan.EnumerableIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class CalciteUtils {
+
+  private static final List<String> BANNED_RULES =
+      Arrays.asList("ElasticsearchProjectRule", "ElasticsearchFilterRule");
+
+  public static final Predicate<RelOptRule> RULE_PREDICATE =
+      relOptRule -> BANNED_RULES.stream()
+          .noneMatch(banned -> relOptRule.toString().startsWith(banned));
+
+  public static final VertexDrelConverterRule ELASTIC_DREL_CONVERTER_RULE =
+      new VertexDrelConverterRule(ElasticsearchRel.CONVENTION);
+
+  public static final EnumerableIntermediatePrelConverterRule 
ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE =
+      new EnumerableIntermediatePrelConverterRule(
+          new 
ElasticSearchEnumerablePrelContext(ElasticsearchStorageConfig.NAME));
+
+  public static Set<RelOptRule> elasticSearchRules() {
+    Set<RelOptRule> rules = Arrays.stream(ElasticsearchRules.RULES)
+        .filter(RULE_PREDICATE)

Review comment:
       We filter Calcite implementations of these rules and add our custom 
versions later.

##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
##########
@@ -60,7 +61,7 @@
     this.rules = ImmutableSet.<RelOptRule>builder()
         .addAll(calciteJdbcRules)
         .add(JdbcIntermediatePrelConverterRule.INSTANCE)
-        .add(new JdbcDrelConverterRule(this))
+        .add(new VertexDrelConverterRule(this))

Review comment:
       Yes, this rule is used for several plugins.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
##########
@@ -144,21 +143,22 @@ public ClassBuilder(DrillConfig config, OptionSet 
optionManager) {
       saveCode(code, name);
     }
 
+    Class<?> compiledClass = getCompiledClass(code, className, config, 
options);
+    logger.debug("Compiled {}: time = {} ms.",
+        className,
+        (System.nanoTime() - t1 + 500_000) / 1_000_000);
+    return compiledClass;
+  }
+
+  public static Class<?> getCompiledClass(String code, String className,

Review comment:
       I had to refactor this class to use some logic in 
`EnumerableRecordReader` instead of code duplication.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/ConvertIntToDecimal.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.convert;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.math.BigDecimal;
+
+public class ConvertIntToDecimal extends DirectConverter {

Review comment:
       It is not for auto casting. It is used for the conversion of the values 
of int type to decimal when using `ColumnConverter` since implicitly it cannot 
be done due to class cast exceptions.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/SubsetRemover.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.RelShuttleImpl;
+
+public class SubsetRemover extends RelShuttleImpl {

Review comment:
       Thanks, added.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
##########
@@ -110,14 +108,14 @@ public void onMatch(RelOptRuleCall call) {
         return;
       }
 
-      DrillScanRelBase newScan = createScan(scan, projectPushInfo);
+      TableScan newScan = createScan(scan, projectPushInfo);
 
       List<RexNode> newProjects = new ArrayList<>();
       for (RexNode n : project.getChildExps()) {
         newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
       }
 
-      DrillProjectRelBase newProject =
+      Project newProject =

Review comment:
       With this change, this rule may be applicable to other implementations, 
there is no specific code that requires Drill implementation, so we shouldn't 
enforce it to have them.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/NodeTypeFinder.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.util.Util;
+
+public class NodeTypeFinder extends RelShuttleImpl {

Review comment:
       Currently, we use it for ElasticSearch. If someone will require this 
class, it can be moved to the common package later.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite 
ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from 
ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be 
null");
+    RelDataType rowType = 
elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), 
other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns 
results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), 
filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), 
convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), 
sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, 
aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.

Review comment:
       Drill can generate a plan that may not contain the elastic project, so 
we ensure that it will be present.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = 
LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new 
ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new 
NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;
+    }
+    RelTraitSet traitSet = filter.getTraitSet().replace(out);
+
+    try {
+      CalciteUtils.analyzePredicate(filter.getCondition());
+    } catch (Exception e) {
+      logger.info("Unable to push filter into ElasticSearch :{}", 
e.getMessage(), e);

Review comment:
       No, it is not an error. We just check whether Calcite can convert filter 
to elasticsearch filter.
   Calcite enforces us to use exceptions as the control flow antipattern here...

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = 
LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new 
ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new 
NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;

Review comment:
       No, we don't fail the query here, we just do not convert the filter to 
the elastic filter.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite 
ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from 
ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be 
null");
+    RelDataType rowType = 
elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), 
other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns 
results as a map

Review comment:
       Thanks, reworded.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite 
ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from 
ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be 
null");
+    RelDataType rowType = 
elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), 
other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns 
results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), 
filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), 
convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), 
sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, 
aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.
+   */
+  private RelNode getMappedInput(RelNode relNode) {
+    boolean hasProject = this.hasProject;
+    this.hasProject = false;
+    RelNode input = relNode.accept(this);
+    if (!this.hasProject) {
+      this.hasProject = hasProject;
+      RelOptCluster cluster = relNode.getCluster();
+      List<RexNode> projections = IntStream.range(0, 
relNode.getRowType().getFieldCount())
+          .mapToObj(i -> cluster.getRexBuilder().makeInputRef(relNode, i))
+          .collect(Collectors.toList());
+
+      return CalciteUtils.createProject(relNode.getTraitSet(), relNode,
+          projections, relNode.getRowType()).accept(this);
+    } else {
+      return input;
+    }
+  }
+
+  private RelRecordType getRelRecordType(RelDataType rowType) {
+    List<RelDataTypeField> fields = new ArrayList<>();
+    for (RelDataTypeField relDataTypeField : rowType.getFieldList()) {
+      if (relDataTypeField.isDynamicStar()) {
+        fields.add(mapField);
+      } else {
+        fields.add(relDataTypeField);
+      }
+    }
+
+    return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false);
+  }
+
+  /**
+   * Implementation of RexShuttle that replaces RexInputRef expressions with 
ITEM calls to _MAP field.
+   */
+  public static class ElasticExpressionMapper extends RexShuttle {
+    private final RexBuilder rexBuilder;
+    private final RelDataType relDataType;
+    private final RelDataTypeField mapField;
+
+    public ElasticExpressionMapper(RexBuilder rexBuilder, RelDataType 
relDataType, RelDataTypeField mapField) {
+      this.rexBuilder = rexBuilder;
+      this.relDataType = relDataType;
+      this.mapField = mapField;
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+      if (inputRef.getType().getSqlTypeName() == SqlTypeName.DYNAMIC_STAR) {
+        return rexBuilder.makeInputRef(mapField.getType(), 0);
+      }
+      return rexBuilder.makeCall(SqlStdOperatorTable.ITEM, 
rexBuilder.makeInputRef(relDataType, 0),
+          
rexBuilder.makeLiteral(relDataType.getFieldNames().get(inputRef.getIndex())));
+    }
+  }
+}

Review comment:
       No, it can't be simplified. Returning `_MAP` field instead of generating 
`ITEM` calls will enforce users to specify these `ITEM` calls in the query to 
access desired fields. 

##########
File path: 
contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ElasticSearchPlanTest extends ClusterTest {

Review comment:
       I'm not sure that `ElasticSearchClusterTest` will fit here since we do 
not start ElasticSearch from the Java code but from the maven plugin. Also, 
every test class has its specific generated data.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchProjectRule.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ElasticsearchProjectRule extends ConverterRule {

Review comment:
       The corresponding rule from the ElasticSearch adapter is much simpler, 
it just converts the projection expressions to the elasticsearch expressions, 
which may fail later. But here we have a logic to split the project if it would 
have expressions. Also, using this rule, we avoid the 
https://issues.apache.org/jira/browse/CALCITE-4440 issue.

##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
##########
@@ -78,19 +77,6 @@ private String stripToOneLineSql(String sql) {
     return strippedSqlTextBldr.toString();
   }
 
-  private static class SubsetRemover extends RelShuttleImpl {

Review comment:
       It was moved to java-exec package to be able to use it in other storage 
plugins.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -257,8 +258,9 @@ public ConversionDefn analyze(MinorType inputType, 
ColumnMetadata outputSchema)
         case BIGINT:
         case FLOAT4:
         case FLOAT8:
-        case VARDECIMAL:
           return IMPLICIT;
+        case VARDECIMAL:
+          return new ConversionDefn(ConvertIntToDecimal.class);

Review comment:
       These both different terms. The existing one `ConversionDefn` is the 
"Definition of a conversion" and that's what the class does. But the argument 
is a column schema, which is also fine. So leaving it as it is.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -203,9 +203,9 @@ public DirectConverter newInstance(
    * <p>
    * Does not support any of the "legacy" decimal types.
    *
-   * @param inputDefn the column schema for the input column which the
+   * @param inputSchema the column schema for the input column which the

Review comment:
       `inputSchema` fits better for Drill, since we have our own terms and use 
them widely in the documentation. Please note, that I've just updated JavaDoc 
to correspond to actual method parameters.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
##########
@@ -27,19 +25,21 @@
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
 
-public class JdbcDrel extends SingleRel implements DrillRel {
+import java.util.List;
+
+public class VertexDrel extends SingleRel implements DrillRel {

Review comment:
       Thanks, added javadoc.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public interface LeafPrel extends Prel {

Review comment:
       It is not related to fragments. It is a prel without children, so leaf. 
Thanks, added javadoc.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Converts and sets given value into the specific column writer.
+ */
+public interface ColumnConverter {

Review comment:
       `ColumnConverter` is a handy way of converting and setting values to the 
specific column writer. Yes, we use it also for ElasticSearch. Please take a 
look at the JavaDocs of its implementations, they provide all required 
additional info.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
##########
@@ -284,7 +284,7 @@ public static boolean isLimit0(RexNode fetch) {
   public static boolean isProjectOutputRowcountUnknown(Project project) {
     for (RexNode rex : project.getProjects()) {
       if (rex instanceof RexCall) {
-        if ("flatten".equals(((RexCall) 
rex).getOperator().getName().toLowerCase())) {
+        if ("flatten".equalsIgnoreCase(((RexCall) 
rex).getOperator().getName())) {

Review comment:
       We can't control the casing of the expressions, some of them are 
generated from Calcite.

##########
File path: 
contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import 
org.apache.drill.exec.store.elasticsearch.schema.ElasticsearchDrillSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
+  private final ElasticsearchStorageConfig config;
+  private final ElasticsearchDrillSchemaFactory schemaFactory;
+
+  public ElasticsearchStoragePlugin(
+      ElasticsearchStorageConfig config, DrillbitContext context, String name) 
{
+    super(context, name);
+    this.config = config;
+    this.schemaFactory = new ElasticsearchDrillSchemaFactory(name, this);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws JsonProcessingException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public ElasticsearchStorageConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext 
optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+        return Collections.emptySet();

Review comment:
       Oh, no, it is because of experimenting with the rules. I've reordered 
switch branches, so we will use `Collections.emptySet()`, which actually is 
also immutable.

##########
File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
##########
@@ -708,7 +708,7 @@ public void testBasicConversionType() {
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, 
bigIntCol));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, 
float4Col));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, 
float8Col));
-    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, 
decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(tinyIntCol, 
decimalCol));

Review comment:
       Because actually, it is impossible to implicitly convert int value to 
decimal by column writers.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DrillDataContext.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.util.Map;
+
+public class DrillDataContext implements DataContext {
+  private final SchemaPlus rootSchema;

Review comment:
       `DataContextImpl` is not public, so can't be used. 
`DataContext.getRootSchema()` returns `SchemaPlus`, so no need to hold 
`CalciteSchema`.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
##########
@@ -210,16 +209,6 @@ protected DrillRel convertToDrel(RelNode relNode, 
AbstractSchema schema, String
     return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), 
writerRel);
   }
 
-  public static DrillScanRel findScan(RelNode... rels) {
-    for (RelNode rel : rels) {
-      if (rel instanceof DrillScanRel) {
-        return (DrillScanRel) rel;
-      } else {
-        return findScan(rel.getInputs().toArray(new RelNode[0]));
-      }
-    }
-    return null;
-  }
   // Make sure no unsupported features in ANALYZE statement are used

Review comment:
       I think it should be done out of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Add Elasticsearch Storage Plugin
> --------------------------------
>
>                 Key: DRILL-3637
>                 URL: https://issues.apache.org/jira/browse/DRILL-3637
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components: Storage - ElasticSearch
>            Reporter: Andrew
>            Assignee: Vova Vysotskyi
>            Priority: Major
>             Fix For: 1.19.0
>
>
> Create a storage plugin for elasticsearch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to