[CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)

Aggregate functions (count/sum/min/max/avg) are pushed down to ES.

Add ElasticsearchAggregate relational expression to convert SQL into native 
Elastic aggregations (value_count, min, max etc.).
Enhance ElasticsearchTable to prepare correct aggregate ES JSON query.

Create special classes to parse recursively elastic aggregation response or 
buckets (located in ElasticJson). They're inspired from existing Elastic 
high-level client source.

For tests, make Json input more human friendly. Single quotes are accepted and 
fields can be unquoted (unless
they contain special characters). Also field with dots 'a.b.c' are 
automatically auto-expanded. This reduces JSON noise.

Fix single projections which previously returned map (see [CALCITE-2485])

Close apache/calcite#801
Close apache/calcite#822


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b

Branch: refs/heads/master
Commit: 79af1c9ba735286653697deed3ff849b7c921fe4
Parents: ce05146
Author: Andrei Sereda <25229979+asereda...@users.noreply.github.com>
Authored: Tue Sep 18 22:53:24 2018 -0400
Committer: Andrei Sereda <25229979+asereda...@users.noreply.github.com>
Committed: Tue Sep 18 22:53:24 2018 -0400

----------------------------------------------------------------------
 elasticsearch/pom.xml                           |   6 +
 .../AbstractElasticsearchTable.java             | 150 -----
 .../elasticsearch/ElasticsearchAggregate.java   | 165 +++++
 .../elasticsearch/ElasticsearchConstants.java   |   9 -
 .../elasticsearch/ElasticsearchEnumerators.java |  44 +-
 .../elasticsearch/ElasticsearchFilter.java      |  17 +-
 .../elasticsearch/ElasticsearchJson.java        | 614 +++++++++++++++++++
 .../elasticsearch/ElasticsearchMethod.java      |  13 +-
 .../elasticsearch/ElasticsearchProject.java     |   6 +-
 .../adapter/elasticsearch/ElasticsearchRel.java |  66 +-
 .../elasticsearch/ElasticsearchRules.java       |  38 +-
 .../elasticsearch/ElasticsearchSchema.java      |  30 +-
 .../elasticsearch/ElasticsearchSort.java        |  41 +-
 .../elasticsearch/ElasticsearchTable.java       | 313 +++++++++-
 .../elasticsearch/ElasticsearchTableScan.java   |   6 +-
 .../ElasticsearchToEnumerableConverter.java     |  46 +-
 .../elasticsearch/PredicateAnalyzer.java        |  10 +
 .../adapter/elasticsearch/QueryBuilders.java    | 106 +++-
 .../adapter/elasticsearch/AggregationTest.java  | 235 +++++++
 .../adapter/elasticsearch/BooleanLogicTest.java |   1 +
 .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++---
 .../elasticsearch/ElasticsearchJsonTest.java    | 183 ++++++
 .../EmbeddedElasticsearchPolicy.java            |  41 +-
 .../adapter/elasticsearch/Projection2Test.java  | 107 ++++
 .../adapter/elasticsearch/ProjectionTest.java   |  37 +-
 .../elasticsearch/QueryBuildersTest.java        |  63 ++
 .../calcite/test/ElasticsearchChecker.java      |  90 ++-
 27 files changed, 2340 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index e3a044d..4700fee 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -124,6 +124,12 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
deleted file mode 100644
index 1a0f6d0..0000000
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.elasticsearch;
-
-import org.apache.calcite.adapter.java.AbstractQueryableTable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.schema.impl.AbstractTableQueryable;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Table based on an Elasticsearch type.
- */
-abstract class AbstractElasticsearchTable extends AbstractQueryableTable
-    implements TranslatableTable {
-
-  final String indexName;
-  final String typeName;
-  final ObjectMapper mapper;
-
-  /**
-   * Creates an ElasticsearchTable.
-   * @param indexName Elastic Search index
-   * @param typeName Elastic Search index type
-   * @param mapper Jackson API to parse (and created) JSON documents
-   */
-  AbstractElasticsearchTable(String indexName, String typeName, ObjectMapper 
mapper) {
-    super(Object[].class);
-    this.indexName = Objects.requireNonNull(indexName, "indexName");
-    this.typeName = Objects.requireNonNull(typeName, "typeName");
-    this.mapper = Objects.requireNonNull(mapper, "mapper");
-  }
-
-  @Override public String toString() {
-    return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
-  }
-
-  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
-    final RelDataType mapType = relDataTypeFactory.createMapType(
-        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
-        relDataTypeFactory.createTypeWithNullability(
-            relDataTypeFactory.createSqlType(SqlTypeName.ANY),
-            true));
-    return relDataTypeFactory.builder().add("_MAP", mapType).build();
-  }
-
-  public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus 
schema,
-      String tableName) {
-    return new ElasticsearchQueryable<>(queryProvider, schema, this, 
tableName);
-  }
-
-  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
-    final RelOptCluster cluster = context.getCluster();
-    return new ElasticsearchTableScan(cluster, 
cluster.traitSetOf(ElasticsearchRel.CONVENTION),
-        relOptTable, this, null);
-  }
-
-  /**
-   * In ES 5.x scripted fields start with {@code params._source.foo} while in 
ES2.x
-   * {@code _source.foo}. Helper method to build correct query based on 
runtime version of elastic.
-   * Used to keep backwards compatibility with ES2.
-   *
-   * @see <a 
href="https://github.com/elastic/elasticsearch/issues/20068";>_source 
variable</a>
-   * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html";>Scripted
 Fields</a>
-   * @return string to be used for scripted fields
-   */
-  protected abstract String scriptedFieldPrefix();
-
-  /** Executes a "find" operation on the underlying type.
-   *
-   * <p>For example,
-   * <code>client.prepareSearch(index).setTypes(type)
-   * .setSource("{\"fields\" : [\"state\"]}")</code></p>
-   *
-   * @param index Elasticsearch index
-   * @param ops List of operations represented as Json strings.
-   * @param fields List of fields to project; or null to return map
-   * @return Enumerator of results
-   */
-  protected abstract Enumerable<Object> find(String index, List<String> ops,
-      List<Map.Entry<String, Class>> fields);
-
-  /**
-   * Implementation of {@link Queryable} based on
-   * a {@link AbstractElasticsearchTable}.
-   *
-   * @param <T> element type
-   */
-  public static class ElasticsearchQueryable<T> extends 
AbstractTableQueryable<T> {
-    ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
-        AbstractElasticsearchTable table, String tableName) {
-      super(queryProvider, schema, table, tableName);
-    }
-
-    public Enumerator<T> enumerator() {
-      return null;
-    }
-
-    private String getIndex() {
-      return schema.unwrap(ElasticsearchSchema.class).getIndex();
-    }
-
-    private AbstractElasticsearchTable getTable() {
-      return (AbstractElasticsearchTable) table;
-    }
-
-    /** Called via code-generation.
-     * @param ops list of queries (as strings)
-     * @param fields projection
-     * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
-     * @return result as enumerable
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public Enumerable<Object> find(List<String> ops,
-        List<Map.Entry<String, Class>> fields) {
-      return getTable().find(getIndex(), ops, fields);
-    }
-  }
-}
-
-// End AbstractElasticsearchTable.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
new file mode 100644
index 0000000..9627aca
--- /dev/null
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+/**
+ * Implementation of
+ * {@link org.apache.calcite.rel.core.Aggregate} relational expression
+ * for ElasticSearch.
+ */
+public class ElasticsearchAggregate extends Aggregate implements 
ElasticsearchRel {
+
+  private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
+      EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, 
SqlKind.SUM);
+
+  /** Creates a ElasticsearchAggregate */
+  ElasticsearchAggregate(RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      boolean indicator,
+      ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets,
+      List<AggregateCall> aggCalls) throws InvalidRelException  {
+    super(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls);
+
+    if (getConvention() != input.getConvention()) {
+      String message = String.format(Locale.ROOT, "%s != %s", getConvention(),
+          input.getConvention());
+      throw new AssertionError(message);
+    }
+
+    assert getConvention() == input.getConvention();
+    assert getConvention() == ElasticsearchRel.CONVENTION;
+    assert this.groupSets.size() == 1 : "Grouping sets not supported";
+
+    for (AggregateCall aggCall : aggCalls) {
+      if (aggCall.isDistinct()) {
+        throw new InvalidRelException("distinct aggregation not supported");
+      }
+
+      SqlKind kind = aggCall.getAggregation().getKind();
+      if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
+        final String message = String.format(Locale.ROOT,
+            "Aggregation %s not supported (use one of %s)", kind, 
SUPPORTED_AGGREGATIONS);
+        throw new InvalidRelException(message);
+      }
+    }
+
+    if (getGroupType() != Group.SIMPLE) {
+      final String message = String.format(Locale.ROOT, "Only %s grouping is 
supported. "
+              + "Yours is %s", Group.SIMPLE, getGroupType());
+      throw new InvalidRelException(message);
+    }
+
+  }
+
+  @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean 
indicator,
+      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+      List<AggregateCall> aggCalls) {
+    try {
+      return new ElasticsearchAggregate(getCluster(), traitSet, input,
+          indicator, groupSet, groupSets,
+          aggCalls);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  @Override public void implement(Implementor implementor) {
+    implementor.visitChild(0, getInput());
+    List<String> inputFields = fieldNames(getInput().getRowType());
+
+    for (int group : groupSet) {
+      implementor.addGroupBy(inputFields.get(group));
+    }
+
+    for (AggregateCall aggCall : aggCalls) {
+      List<String> names = new ArrayList<>();
+      for (int i : aggCall.getArgList()) {
+        names.add(inputFields.get(i));
+      }
+
+      final String name = names.isEmpty() ? ElasticsearchConstants.ID : 
names.get(0);
+
+      String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}",
+          toElasticAggregate(aggCall),
+          name);
+
+      implementor.addAggregation(aggCall.getName(), op);
+    }
+  }
+
+  /**
+   * Most of the aggregations can be retrieved with single
+   * <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html";>stats</a>
+   * function. But currently only one-to-one mapping is supported between sql 
agg and elastic
+   * aggregation.
+   */
+  private String toElasticAggregate(AggregateCall call) {
+    SqlKind kind = call.getAggregation().getKind();
+    switch (kind) {
+    case COUNT:
+      return call.isApproximate() ? "cardinality" : "value_count";
+    case SUM:
+      return "sum";
+    case MIN:
+      return "min";
+    case MAX:
+      return "max";
+    case AVG:
+      return "avg";
+    default:
+      throw new IllegalArgumentException("Unknown aggregation kind " + kind + 
" for " + call);
+    }
+  }
+
+  private List<String> fieldNames(RelDataType relDataType) {
+    List<String> names = new ArrayList<>();
+
+    for (RelDataTypeField rdtf : relDataType.getFieldList()) {
+      names.add(rdtf.getName());
+    }
+    return names;
+  }
+
+}
+
+// End ElasticsearchAggregate.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
index ed628cc..2c4c42c 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
@@ -30,18 +30,9 @@ interface ElasticsearchConstants {
   String FIELDS = "fields";
   String SOURCE_PAINLESS = "params._source";
   String SOURCE_GROOVY = "_source";
-  String SOURCE = SOURCE_GROOVY;
   String ID = "_id";
   String UID = "_uid";
 
-  /* Aggregation pushdown operations supported */
-  String AGG_SUM = "SUM";
-  String AGG_SUM0 = "$SUM0";
-  String AGG_COUNT = "COUNT";
-  String AGG_MIN = "MIN";
-  String AGG_MAX = "MAX";
-  String AGG_AVG = "AVG";
-
   Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX);
 
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
index d87de7e..16ac92d 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
@@ -26,27 +26,27 @@ import java.util.Map;
 
 /**
  * Util functions which convert
- * {@link 
org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
+ * {@link ElasticsearchJson.SearchHit}
  * into calcite specific return type (map, object[], list etc.)
  */
 class ElasticsearchEnumerators {
 
   private ElasticsearchEnumerators() {}
 
-  private static Function1<ElasticsearchSearchResult.SearchHit, Map> 
mapGetter() {
-    return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
-      public Map apply(ElasticsearchSearchResult.SearchHit hits) {
+  private static Function1<ElasticsearchJson.SearchHit, Map> mapGetter() {
+    return new Function1<ElasticsearchJson.SearchHit, Map>() {
+      public Map apply(ElasticsearchJson.SearchHit hits) {
         return hits.sourceOrFields();
       }
     };
   }
 
-  private static Function1<ElasticsearchSearchResult.SearchHit, Object> 
singletonGetter(
+  private static Function1<ElasticsearchJson.SearchHit, Object> 
singletonGetter(
       final String fieldName,
       final Class fieldClass) {
-    return new Function1<ElasticsearchSearchResult.SearchHit, Object>() {
-      public Object apply(ElasticsearchSearchResult.SearchHit hits) {
-        return convert(hits.sourceOrFields(), fieldClass);
+    return new Function1<ElasticsearchJson.SearchHit, Object>() {
+      public Object apply(ElasticsearchJson.SearchHit hits) {
+        return convert(hits.valueOrNull(fieldName), fieldClass);
       }
     };
   }
@@ -59,30 +59,38 @@ class ElasticsearchEnumerators {
    *
    * @return function that converts the search result into a generic array
    */
-  private static Function1<ElasticsearchSearchResult.SearchHit, Object[]> 
listGetter(
+  private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
       final List<Map.Entry<String, Class>> fields) {
-    return new Function1<ElasticsearchSearchResult.SearchHit, Object[]>() {
-      public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
+    return new Function1<ElasticsearchJson.SearchHit, Object[]>() {
+      public Object[] apply(ElasticsearchJson.SearchHit hit) {
         Object[] objects = new Object[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
           final Map.Entry<String, Class> field = fields.get(i);
           final String name = field.getKey();
           final Class type = field.getValue();
-          objects[i] = convert(hit.value(name), type);
+          objects[i] = convert(hit.valueOrNull(name), type);
         }
         return objects;
       }
     };
   }
 
-  static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
+  static Function1<ElasticsearchJson.SearchHit, Object> getter(
       List<Map.Entry<String, Class>> fields) {
     //noinspection unchecked
-    return fields == null
-      ? (Function1) mapGetter()
-      : fields.size() == 1
-      ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
-      : (Function1) listGetter(fields);
+    final Function1 getter;
+    if (fields == null || fields.size() == 1 && 
"_MAP".equals(fields.get(0).getKey())) {
+      // select * from table
+      getter = mapGetter();
+    } else if (fields.size() == 1) {
+      // select foo from table
+      getter = singletonGetter(fields.get(0).getKey(), 
fields.get(0).getValue());
+    } else {
+      // select a, b, c from table
+      getter = listGetter(fields);
+    }
+
+    return getter;
   }
 
   private static Object convert(Object o, Class clazz) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
index 4d187b1..c339671 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
@@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter implements 
ElasticsearchRel {
 
   @Override public void implement(Implementor implementor) {
     implementor.visitChild(0, getInput());
-    List<String> fieldNames;
-    if (input instanceof Project) {
-      final List<RexNode> projects = ((Project) input).getProjects();
-      fieldNames = new ArrayList<>(projects.size());
-      for (RexNode project : projects) {
-        String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
-        fieldNames.add(name);
-      }
-    } else {
-      fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType());
-    }
     ObjectMapper mapper = implementor.elasticsearchTable.mapper;
     PredicateAnalyzerTranslator translator = new 
PredicateAnalyzerTranslator(mapper);
     try {
       implementor.add(translator.translateMatch(condition));
     } catch (IOException e) {
       throw new UncheckedIOException(e);
-    } catch (ExpressionNotAnalyzableException e) {
+    } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) {
       throw new RuntimeException(e);
     }
   }
@@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter implements 
ElasticsearchRel {
       this.mapper = Objects.requireNonNull(mapper, "mapper");
     }
 
-    String translateMatch(RexNode condition) throws IOException, 
ExpressionNotAnalyzableException {
+    String translateMatch(RexNode condition) throws IOException,
+        PredicateAnalyzer.ExpressionNotAnalyzableException {
 
       StringWriter writer = new StringWriter();
       JsonGenerator generator = mapper.getFactory().createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
new file mode 100644
index 0000000..7c80e82
--- /dev/null
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.StreamSupport;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * Internal objects (and deserializers) used to parse elastic search results
+ * (which are in JSON format).
+ *
+ * <p>Since we're using basic row-level rest client http response has to be
+ * processed manually using JSON (jackson) library.
+ */
+class ElasticsearchJson {
+
+  /**
+   * Used as special aggregation key for missing values (documents which are 
missing a field).
+   * Buckets with that value are then converted to {@code null}s in flat 
tabular format.
+   * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html";>Missing
 Value</a>
+   */
+  static final JsonNode MISSING_VALUE = 
JsonNodeFactory.instance.textNode("__MISSING__");
+
+  private ElasticsearchJson() {}
+
+  /**
+   * Visits leaves of the aggregation where all values are stored.
+   */
+  static void visitValueNodes(Aggregations aggregations, Consumer<Map<String, 
Object>> consumer) {
+    Objects.requireNonNull(aggregations, "aggregations");
+    Objects.requireNonNull(consumer, "consumer");
+
+    List<Bucket> buckets = new ArrayList<>();
+
+    Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
+
+    BiConsumer<RowKey, MultiValue> cons = (r, v) ->
+        rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
+    aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
+    rows.forEach((k, v) -> {
+      Map<String, Object> row = new LinkedHashMap<>(k.keys);
+      v.forEach(val -> row.put(val.getName(), val.value()));
+      consumer.accept(row);
+    });
+  }
+
+  /**
+   * Identifies a calcite row (as in relational algebra)
+   */
+  private static class RowKey {
+    private final Map<String, Object> keys;
+    private final int hashCode;
+
+    private RowKey(final Map<String, Object> keys) {
+      this.keys = Objects.requireNonNull(keys, "keys");
+      this.hashCode = Objects.hashCode(keys);
+    }
+
+    private RowKey(List<Bucket> buckets) {
+      this(toMap(buckets));
+    }
+
+    private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
+      return StreamSupport.stream(buckets.spliterator(), false)
+          .collect(LinkedHashMap::new,
+              (m, v) -> m.put(v.getName(), v.key()),
+              LinkedHashMap::putAll);
+    }
+
+    @Override public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      final RowKey rowKey = (RowKey) o;
+      return hashCode == rowKey.hashCode
+          && Objects.equals(keys, rowKey.keys);
+    }
+
+    @Override public int hashCode() {
+      return this.hashCode;
+    }
+  }
+
+  private static void visitValueNodes(Aggregation aggregation, List<Bucket> 
parents,
+      BiConsumer<RowKey, MultiValue> consumer) {
+
+    if (aggregation instanceof MultiValue) {
+      // publish one value of the row
+      RowKey key = new RowKey(parents);
+      consumer.accept(key, (MultiValue) aggregation);
+      return;
+    }
+
+    if (aggregation instanceof Bucket) {
+      Bucket bucket = (Bucket) aggregation;
+      parents.add(bucket);
+      bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
+      parents.remove(parents.size() - 1);
+    } else if (aggregation instanceof HasAggregations) {
+      HasAggregations children = (HasAggregations) aggregation;
+      children.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
+    } else if (aggregation instanceof MultiBucketsAggregation) {
+      MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
+      multi.buckets().forEach(b -> {
+        parents.add(b);
+        b.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
+        parents.remove(parents.size() - 1);
+      });
+    }
+
+  }
+
+  /**
+   * Response from Elastic
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static class Result {
+    private final SearchHits hits;
+    private final Aggregations aggregations;
+    private final long took;
+
+    /**
+     * Constructor for this instance.
+     * @param hits list of matched documents
+     * @param took time taken (in took) for this query to execute
+     */
+    @JsonCreator
+    Result(@JsonProperty("hits") SearchHits hits,
+        @JsonProperty("aggregations") Aggregations aggregations,
+        @JsonProperty("took") long took) {
+      this.hits = Objects.requireNonNull(hits, "hits");
+      this.aggregations = aggregations;
+      this.took = took;
+    }
+
+    SearchHits searchHits() {
+      return hits;
+    }
+
+    Aggregations aggregations() {
+      return aggregations;
+    }
+
+    public Duration took() {
+      return Duration.ofMillis(took);
+    }
+
+  }
+
+  /**
+   * Similar to {@code SearchHits} in ES. Container for {@link SearchHit}
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static class SearchHits {
+
+    private final long total;
+    private final List<SearchHit> hits;
+
+    @JsonCreator
+    SearchHits(@JsonProperty("total")final long total,
+               @JsonProperty("hits") final List<SearchHit> hits) {
+      this.total = total;
+      this.hits = Objects.requireNonNull(hits, "hits");
+    }
+
+    public List<SearchHit> hits() {
+      return this.hits;
+    }
+
+    public long total() {
+      return total;
+    }
+
+  }
+
+  /**
+   * Concrete result record which matched the query. Similar to {@code 
SearchHit} in ES.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static class SearchHit {
+    private final String id;
+    private final Map<String, Object> source;
+    private final Map<String, Object> fields;
+
+    @JsonCreator
+    SearchHit(@JsonProperty("_id") final String id,
+                      @JsonProperty("_source") final Map<String, Object> 
source,
+                      @JsonProperty("fields") final Map<String, Object> 
fields) {
+      this.id = Objects.requireNonNull(id, "id");
+
+      // both can't be null
+      if (source == null && fields == null) {
+        final String message = String.format(Locale.ROOT,
+            "Both '_source' and 'fields' are missing for %s", id);
+        throw new IllegalArgumentException(message);
+      }
+
+      // both can't be non-null
+      if (source != null && fields != null) {
+        final String message = String.format(Locale.ROOT,
+            "Both '_source' and 'fields' are populated (non-null) for %s", id);
+        throw new IllegalArgumentException(message);
+      }
+
+      this.source = source;
+      this.fields = fields;
+    }
+
+    /**
+     * Returns id of this hit (usually document id)
+     * @return unique id
+     */
+    public String id() {
+      return id;
+    }
+
+    Object valueOrNull(String name) {
+      Objects.requireNonNull(name, "name");
+      if (fields != null && fields.containsKey(name)) {
+        Object field = fields.get(name);
+        if (field instanceof Iterable) {
+          // return first element (or null)
+          Iterator<?> iter = ((Iterable<?>) field).iterator();
+          return iter.hasNext() ? iter.next() : null;
+        }
+
+        return field;
+      }
+
+      return valueFromPath(source, name);
+    }
+
+    /**
+     * Returns property from nested maps given a path like {@code a.b.c}.
+     * @param map current map
+     * @param path field path(s), optionally with dots ({@code a.b.c}).
+     * @return value located at path {@code path} or {@code null} if not found.
+     */
+    private static Object valueFromPath(Map<String, Object> map, String path) {
+      if (map == null) {
+        return null;
+      }
+
+      if (map.containsKey(path)) {
+        return map.get(path);
+      }
+
+      // maybe pattern of type a.b.c
+      final int index = path.indexOf('.');
+      if (index == -1) {
+        return null;
+      }
+
+      final String prefix = path.substring(0, index);
+      final String suffix = path.substring(index + 1);
+
+      Object maybeMap = map.get(prefix);
+      if (maybeMap instanceof Map) {
+        return valueFromPath((Map<String, Object>) maybeMap, suffix);
+      }
+
+      return null;
+    }
+
+    Map<String, Object> source() {
+      return source;
+    }
+
+    Map<String, Object> fields() {
+      return fields;
+    }
+
+    Map<String, Object> sourceOrFields() {
+      return source != null ? source : fields;
+    }
+  }
+
+
+  /**
+   * {@link Aggregation} container.
+   */
+  @JsonDeserialize(using = AggregationsDeserializer.class)
+  static class Aggregations implements Iterable<Aggregation> {
+
+    private final List<? extends Aggregation> aggregations;
+    private Map<String, Aggregation> aggregationsAsMap;
+
+    Aggregations(List<? extends Aggregation> aggregations) {
+      this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
+    }
+
+    /**
+     * Iterates over the {@link Aggregation}s.
+     */
+    @Override public final Iterator<Aggregation> iterator() {
+      return asList().iterator();
+    }
+
+    /**
+     * The list of {@link Aggregation}s.
+     */
+    final List<Aggregation> asList() {
+      return Collections.unmodifiableList(aggregations);
+    }
+
+    /**
+     * Returns the {@link Aggregation}s keyed by aggregation name. Lazy init.
+     */
+    final Map<String, Aggregation> asMap() {
+      if (aggregationsAsMap == null) {
+        Map<String, Aggregation> map = new 
LinkedHashMap<>(aggregations.size());
+        for (Aggregation aggregation : aggregations) {
+          map.put(aggregation.getName(), aggregation);
+        }
+        this.aggregationsAsMap = unmodifiableMap(map);
+      }
+      return aggregationsAsMap;
+    }
+
+    /**
+     * Returns the aggregation that is associated with the specified name.
+     */
+    @SuppressWarnings("unchecked")
+    public final <A extends Aggregation> A get(String name) {
+      return (A) asMap().get(name);
+    }
+
+    @Override public final boolean equals(Object obj) {
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+      return aggregations.equals(((Aggregations) obj).aggregations);
+    }
+
+    @Override public final int hashCode() {
+      return Objects.hash(getClass(), aggregations);
+    }
+
+  }
+
+  /**
+   * Identifies all aggregations
+   */
+  interface Aggregation {
+
+    /**
+     * @return The name of this aggregation.
+     */
+    String getName();
+
+  }
+
+  /**
+   * Allows traversing aggregations tree
+   */
+  interface HasAggregations {
+    Aggregations getAggregations();
+  }
+
+  /**
+   * An aggregation that returns multiple buckets
+   */
+  static class MultiBucketsAggregation implements Aggregation {
+
+    private final String name;
+    private final List<Bucket> buckets;
+
+    MultiBucketsAggregation(final String name,
+        final List<Bucket> buckets) {
+      this.name = name;
+      this.buckets = buckets;
+    }
+
+    /**
+     * @return  The buckets of this aggregation.
+     */
+    List<Bucket> buckets() {
+      return buckets;
+    }
+
+    @Override public String getName() {
+      return name;
+    }
+  }
+
+  /**
+   * A bucket represents a criteria to which all documents that fall in it 
adhere to.
+   * It is also uniquely identified
+   * by a key, and can potentially hold sub-aggregations computed over all 
documents in it.
+   */
+  static class Bucket implements HasAggregations, Aggregation {
+    private final Object key;
+    private final String name;
+    private final Aggregations aggregations;
+
+    Bucket(final Object key,
+        final String name,
+        final Aggregations aggregations) {
+      this.key = key; // key can be set after construction
+      this.name = Objects.requireNonNull(name, "name");
+      this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
+    }
+
+    /**
+     * @return The key associated with the bucket
+     */
+    Object key() {
+      return key;
+    }
+
+    /**
+     * @return The key associated with the bucket as a string
+     */
+    String keyAsString() {
+      return Objects.toString(key());
+    }
+
+    /**
+     * @return  The sub-aggregations of this bucket
+     */
+    @Override public Aggregations getAggregations() {
+      return aggregations;
+    }
+
+    @Override public String getName() {
+      return name;
+    }
+  }
+
+  /**
+   * Multi value aggregatoin like
+   * <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html";>Stats</a>
+   */
+  static class MultiValue implements Aggregation {
+    private final String name;
+    private final Map<String, Object> values;
+
+    MultiValue(final String name, final Map<String, Object> values) {
+      this.name = Objects.requireNonNull(name, "name");
+      this.values = Objects.requireNonNull(values, "values");
+    }
+
+    @Override public String getName() {
+      return name;
+    }
+
+    Map<String, Object> values() {
+      return values;
+    }
+
+    /**
+     * For single value. Returns single value represented by this leaf 
aggregation.
+     * @return value corresponding to {@code value}
+     */
+    Object value() {
+      if (!values().containsKey("value")) {
+        throw new IllegalStateException("'value' field not present in this 
aggregation");
+      }
+
+      return values().get("value");
+    }
+
+  }
+
+  /**
+   * Allows to de-serialize nested aggregation structures.
+   */
+  static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
+
+    private static final Set<String> IGNORE_TOKENS = new 
HashSet<>(Arrays.asList("meta",
+        "buckets", "value", "values", "value_as_string", "doc_count", "key", 
"key_as_string"));
+
+    AggregationsDeserializer() {
+      super(Aggregations.class);
+    }
+
+    @Override public Aggregations deserialize(final JsonParser parser,
+        final DeserializationContext ctxt)
+        throws IOException  {
+
+      ObjectNode node = parser.getCodec().readTree(parser);
+      return parseAggregations(parser, node);
+    }
+
+    private static Aggregations parseAggregations(JsonParser parser, 
ObjectNode node)
+        throws JsonProcessingException {
+
+      List<Aggregation> aggregations = new ArrayList<>();
+
+      Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
+      for (Map.Entry<String, JsonNode> entry : iter) {
+        final String name = entry.getKey();
+        final JsonNode value = entry.getValue();
+
+        Aggregation agg = null;
+        if (value.has("buckets")) {
+          agg = parseBuckets(parser, name, (ArrayNode) value.get("buckets"));
+        } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
+          // leaf
+          agg = parseValue(parser, name, (ObjectNode) value);
+        }
+
+        if (agg != null) {
+          aggregations.add(agg);
+        }
+      }
+
+      return new Aggregations(aggregations);
+    }
+
+
+
+    private static MultiValue parseValue(JsonParser parser, String name, 
ObjectNode node)
+        throws JsonProcessingException {
+
+      return new MultiValue(name, parser.getCodec().treeToValue(node, 
Map.class));
+    }
+
+    private static Aggregation parseBuckets(JsonParser parser, String name, 
ArrayNode nodes)
+        throws JsonProcessingException {
+
+      List<Bucket> buckets = new ArrayList<>(nodes.size());
+      for (JsonNode b: nodes) {
+        buckets.add(parseBucket(parser, name, (ObjectNode) b));
+      }
+
+      return new MultiBucketsAggregation(name, buckets);
+    }
+
+    /**
+     * Determines if current key is a missing field key. Missing key is 
returned when document
+     * does not have pivoting attribute (example {@code GROUP BY 
_MAP['a.b.missing']}). It helps
+     * grouping documents which don't have a field. In relational algebra this
+     * would be {@code null}.
+     *
+     * @param key current {@code key} (usually string) as returned by ES
+     * @return {@code true} if this value
+     * @see #MISSING_VALUE
+     */
+    private static boolean isMissingBucket(JsonNode key) {
+      return MISSING_VALUE.equals(key);
+    }
+
+    private static Bucket parseBucket(JsonParser parser, String name, 
ObjectNode node)
+        throws JsonProcessingException  {
+
+      final JsonNode keyNode = node.get("key");
+      final Object key;
+      if (isMissingBucket(keyNode) || keyNode.isNull()) {
+        key = null;
+      } else if (keyNode.isTextual()) {
+        key = keyNode.textValue();
+      } else if (keyNode.isNumber()) {
+        key = keyNode.numberValue();
+      } else if (keyNode.isBoolean()) {
+        key = keyNode.booleanValue();
+      } else {
+        // don't usually expect keys to be Objects
+        key = parser.getCodec().treeToValue(node, Map.class);
+      }
+
+      return new Bucket(key, name, parseAggregations(parser, node));
+    }
+
+  }
+}
+
+// End ElasticsearchJson.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
index 72753e6..709156f 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -27,8 +27,17 @@ import java.util.List;
  * Builtin methods in the Elasticsearch adapter.
  */
 enum ElasticsearchMethod {
-  
ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
-      "find", List.class, List.class);
+
+  ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class,
+      "find",
+      List.class, // ops  - projections and other stuff
+      List.class, // fields
+      List.class, // sort
+      List.class, // groupBy
+      List.class, // aggregations
+      Long.class, // offset
+      Long.class // fetch
+      );
 
   public final Method method;
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index 7d5811c..d0841c3 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project 
implements ElasticsearchRel {
       query.append("\"script_fields\": {" + String.join(", ", scriptFields) + 
"}");
     }
 
-    for (String opfield : implementor.list) {
-      if (opfield.startsWith("\"_source\"")) {
-        implementor.list.remove(opfield);
-      }
-    }
+    implementor.list.removeIf(l -> l.startsWith("\"_source\""));
     implementor.add(query.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
index 436adf9..1dad691 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * Relational expression that uses Elasticsearch calling convention.
@@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode {
    * {@link ElasticsearchRel} nodes into an Elasticsearch query.
    */
   class Implementor {
+
     final List<String> list = new ArrayList<>();
 
+    /**
+     * Sorting clauses.
+     * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html";>Sort</a>
+     */
+    final List<Map.Entry<String, RelFieldCollation.Direction>> sort = new 
ArrayList<>();
+
+    /**
+     * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements 
(functions).
+     * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html";>aggregations</a>
+     */
+    final List<Map.Entry<String, String>> aggregations = new ArrayList<>();
+
+    /**
+     * Allows bucketing documents together. Similar to {@code select ... from 
table group by field1}
+     * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html";>Bucket
 Aggregrations</a>
+     */
+    final List<String> groupBy = new ArrayList<>();
+
+    /**
+     * Starting index (default {@code 0}). Equivalent to {@code start} in ES 
query.
+     * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html";>From/Size</a>
+     */
+    Long offset;
+
+    /**
+     * Number of records to return. Equivalent to {@code size} in ES query.
+     * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html";>From/Size</a>
+     */
+    Long fetch;
+
     RelOptTable table;
-    AbstractElasticsearchTable elasticsearchTable;
+    ElasticsearchTable elasticsearchTable;
 
-    public void add(String findOp) {
+    void add(String findOp) {
       list.add(findOp);
     }
 
-    public void visitChild(int ordinal, RelNode input) {
+    void addGroupBy(String field) {
+      Objects.requireNonNull(field, "field");
+      groupBy.add(field);
+    }
+
+    void addSort(String field, RelFieldCollation.Direction direction) {
+      Objects.requireNonNull(field, "field");
+      sort.add(new Pair<>(field, direction));
+    }
+
+    void addAggregation(String field, String expression) {
+      Objects.requireNonNull(field, "field");
+      Objects.requireNonNull(expression, "expression");
+      aggregations.add(new Pair<>(field, expression));
+    }
+
+    void offset(long offset) {
+      this.offset = offset;
+    }
+
+    void fetch(long fetch) {
+      this.fetch = fetch;
+    }
+
+    void visitChild(int ordinal, RelNode input) {
       assert ordinal == 0;
       ((ElasticsearchRel) input).implement(this);
     }
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
index 97e934c..b442ddd 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataType;
@@ -53,7 +55,8 @@ class ElasticsearchRules {
   static final RelOptRule[] RULES = {
       ElasticsearchSortRule.INSTANCE,
       ElasticsearchFilterRule.INSTANCE,
-      ElasticsearchProjectRule.INSTANCE
+      ElasticsearchProjectRule.INSTANCE,
+      ElasticsearchAggregateRule.INSTANCE
   };
 
   private ElasticsearchRules() {}
@@ -147,7 +150,7 @@ class ElasticsearchRules {
         }
       }
       throw new IllegalArgumentException("Translation of " + call.toString()
-        + "is not supported by ElasticsearchProject");
+        + " is not supported by ElasticsearchProject");
     }
 
     List<String> visitList(List<RexNode> list) {
@@ -217,6 +220,37 @@ class ElasticsearchRules {
   }
 
   /**
+   * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalAggregate}
+   * to an {@link ElasticsearchAggregate}.
+   */
+  private static class ElasticsearchAggregateRule extends 
ElasticsearchConverterRule {
+    static final RelOptRule INSTANCE = new ElasticsearchAggregateRule();
+
+    private ElasticsearchAggregateRule() {
+      super(LogicalAggregate.class, Convention.NONE, 
ElasticsearchRel.CONVENTION,
+          "ElasticsearchAggregateRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalAggregate agg = (LogicalAggregate) rel;
+      final RelTraitSet traitSet = agg.getTraitSet().replace(out);
+      try {
+        return new ElasticsearchAggregate(
+            rel.getCluster(),
+            traitSet,
+            convert(agg.getInput(), traitSet.simplify()),
+            agg.indicator,
+            agg.getGroupSet(),
+            agg.getGroupSets(),
+            agg.getAggCallList());
+      } catch (InvalidRelException e) {
+        return null;
+      }
+    }
+  }
+
+
+  /**
    * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
    * to an {@link ElasticsearchProject}.
    */

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
index 1c630ad..80a94be 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
@@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
+import java.util.Collections;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
@@ -48,6 +49,8 @@ public class ElasticsearchSchema extends AbstractSchema {
 
   private final ObjectMapper mapper;
 
+  private final Map<String, Table> tableMap;
+
   /**
    * Allows schema to be instantiated from existing elastic search client.
    * This constructor is used in tests.
@@ -56,20 +59,33 @@ public class ElasticsearchSchema extends AbstractSchema {
    * @param index name of ES index
    */
   public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String 
index) {
+    this(client, mapper, index, null);
+  }
+
+  public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String 
index, String type) {
     super();
     this.client = Objects.requireNonNull(client, "client");
     this.mapper = Objects.requireNonNull(mapper, "mapper");
     this.index = Objects.requireNonNull(index, "index");
+    if (type == null) {
+      try {
+        this.tableMap = createTables(listTypesFromElastic());
+      } catch (IOException e) {
+        throw new UncheckedIOException("Couldn't get types for " + index, e);
+      }
+    } else {
+      this.tableMap = createTables(Collections.singleton(type));
+    }
   }
 
   @Override protected Map<String, Table> getTableMap() {
+    return tableMap;
+  }
+
+  private Map<String, Table> createTables(Iterable<String> types) {
     final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
-    try {
-      for (String type: listTypes()) {
-        builder.put(type, new ElasticsearchTable(client, mapper, index, type));
-      }
-    } catch (IOException e) {
-      throw new UncheckedIOException("Failed to get types for " + index, e);
+    for (String type : types) {
+      builder.put(type, new ElasticsearchTable(client, mapper, index, type));
     }
     return builder.build();
   }
@@ -81,7 +97,7 @@ public class ElasticsearchSchema extends AbstractSchema {
    * @throws IOException for any IO related issues
    * @throws IllegalStateException if reply is not understood
    */
-  private Set<String> listTypes() throws IOException  {
+  private Set<String> listTypesFromElastic() throws IOException  {
     final String endpoint = "/" + index + "/_mapping";
     final Response response = client.performRequest("GET", endpoint);
     try (InputStream is = response.getEntity().getContent()) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
index ed669aa..9078b72 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Util;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort implements 
ElasticsearchRel {
 
   @Override public void implement(Implementor implementor) {
     implementor.visitChild(0, getInput());
-    if (!collation.getFieldCollations().isEmpty()) {
-      final List<String> keys = new ArrayList<>();
-      if (input instanceof Project) {
-        final List<RexNode> projects = ((Project) input).getProjects();
+    final List<RelDataTypeField> fields = getRowType().getFieldList();
 
-        for (RelFieldCollation fieldCollation : 
collation.getFieldCollations()) {
-          RexNode project = projects.get(fieldCollation.getFieldIndex());
-          String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
-          keys.add(ElasticsearchRules.quote(name) + ": " + 
direction(fieldCollation));
-        }
-      } else {
-        final List<RelDataTypeField> fields = getRowType().getFieldList();
-
-        for (RelFieldCollation fieldCollation : 
collation.getFieldCollations()) {
-          final String name = 
fields.get(fieldCollation.getFieldIndex()).getName();
-          keys.add(ElasticsearchRules.quote(name) + ": " + 
direction(fieldCollation));
-        }
-      }
-
-      implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + 
"]");
+    for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+      final String name = fields.get(fieldCollation.getFieldIndex()).getName();
+      implementor.addSort(name, fieldCollation.getDirection());
     }
 
     if (offset != null) {
-      implementor.add("\"from\": " + ((RexLiteral) offset).getValue());
+      implementor.offset(((RexLiteral) offset).getValueAs(Long.class));
     }
 
     if (fetch != null) {
-      implementor.add("\"size\": " + ((RexLiteral) fetch).getValue());
+      implementor.fetch(((RexLiteral) fetch).getValueAs(Long.class));
     }
   }
 
-  private String direction(RelFieldCollation fieldCollation) {
-    switch (fieldCollation.getDirection()) {
-    case DESCENDING:
-    case STRICTLY_DESCENDING:
-      return "\"desc\"";
-    case ASCENDING:
-    case STRICTLY_ASCENDING:
-    default:
-      return "\"asc\"";
-    }
-  }
 }
 
 // End ElasticsearchSort.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index 955636e..c404da7 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -16,9 +16,24 @@
  */
 package org.apache.calcite.adapter.elasticsearch;
 
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
 import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
 import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Util;
 
 import org.apache.http.HttpEntity;
@@ -29,26 +44,47 @@ import org.apache.http.util.EntityUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * Table based on an Elasticsearch type.
  */
-public class ElasticsearchTable extends AbstractElasticsearchTable {
+public class ElasticsearchTable extends AbstractQueryableTable implements 
TranslatableTable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchTable.class);
+
+  /**
+   * Used for constructing (possibly nested) Elastic aggregation nodes.
+   */
+  private static final String AGGREGATIONS = "aggregations";
+
   private final RestClient restClient;
   private final ElasticsearchVersion version;
-
+  private final String indexName;
+  private final String typeName;
+  final ObjectMapper mapper;
 
   /**
    * Creates an ElasticsearchTable.
@@ -58,7 +94,7 @@ public class ElasticsearchTable extends 
AbstractElasticsearchTable {
    * @param typeName elastic searh index type
    */
   ElasticsearchTable(RestClient client, ObjectMapper mapper, String indexName, 
String typeName) {
-    super(indexName, typeName, Objects.requireNonNull(mapper, "mapper"));
+    super(Object[].class);
     this.restClient = Objects.requireNonNull(client, "client");
     try {
       this.version = detectVersion(client, mapper);
@@ -67,6 +103,9 @@ public class ElasticsearchTable extends 
AbstractElasticsearchTable {
           + "for %s/%s", indexName, typeName);
       throw new UncheckedIOException(message, e);
     }
+    this.indexName = Objects.requireNonNull(indexName, "indexName");
+    this.typeName = Objects.requireNonNull(typeName, "typeName");
+    this.mapper = Objects.requireNonNull(mapper, "mapper");
 
   }
 
@@ -87,37 +126,211 @@ public class ElasticsearchTable extends 
AbstractElasticsearchTable {
     return 
ElasticsearchVersion.fromString(node.get("version").get("number").asText());
   }
 
-  @Override protected String scriptedFieldPrefix() {
+  /**
+   * In ES 5.x scripted fields start with {@code params._source.foo} while in 
ES2.x
+   * {@code _source.foo}. Helper method to build correct query based on 
runtime version of elastic.
+   * Used to keep backwards compatibility with ES2.
+   *
+   * @see <a 
href="https://github.com/elastic/elasticsearch/issues/20068";>_source 
variable</a>
+   * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html";>Scripted
 Fields</a>
+   * @return string to be used for scripted fields
+   */
+  String scriptedFieldPrefix() {
     // ES2 vs ES5 scripted field difference
     return version == ElasticsearchVersion.ES2
         ? ElasticsearchConstants.SOURCE_GROOVY
         : ElasticsearchConstants.SOURCE_PAINLESS;
   }
 
-  @Override protected Enumerable<Object> find(String index, List<String> ops,
-      List<Map.Entry<String, Class>> fields) {
+  /**
+   * Executes a "find" operation on the underlying type.
+   *
+   * <p>For example,
+   * <code>client.prepareSearch(index).setTypes(type)
+   * .setSource("{\"fields\" : [\"state\"]}")</code></p>
+   *
+   * @param ops List of operations represented as Json strings.
+   * @param fields List of fields to project; or null to return map
+   * @param sort list of fields to sort and their direction (asc/desc)
+   * @param aggregations aggregation functions
+   * @return Enumerator of results
+   */
+  protected Enumerable<Object> find(List<String> ops,
+      List<Map.Entry<String, Class>> fields,
+      List<Map.Entry<String, RelFieldCollation.Direction>> sort,
+      List<String> groupBy,
+      List<Map.Entry<String, String>> aggregations,
+      Long offset, Long fetch) throws IOException {
+
+    if (!aggregations.isEmpty()) {
+      // process aggregations separately
+      return aggregate(ops, fields, sort, groupBy, aggregations, offset, 
fetch);
+    }
+
+    final ObjectNode query = mapper.createObjectNode();
 
-    final String query;
-    if (!ops.isEmpty()) {
-      query = "{" + Util.toString(ops, "", ", ", "") + "}";
-    } else {
-      query = "{}";
+    // manually parse from previously concatenated string
+    query.setAll(
+        (ObjectNode) mapper.readTree("{"
+            + Util.toString(ops, "", ", ", "") + "}"));
+
+    if (!sort.isEmpty()) {
+      ArrayNode sortNode = query.withArray("sort");
+      sort.forEach(e ->
+          sortNode.add(
+            mapper.createObjectNode().put(e.getKey(), 
e.getValue().isDescending() ? "desc" : "asc"))
+      );
+    }
+
+    if (offset != null) {
+      query.put("from", offset);
+    }
+
+    if (fetch != null) {
+      query.put("size", fetch);
     }
 
     try {
-      ElasticsearchSearchResult result = httpRequest(query);
-      final Function1<ElasticsearchSearchResult.SearchHit, Object> getter =
+      ElasticsearchJson.Result search = httpRequest(query);
+      final Function1<ElasticsearchJson.SearchHit, Object> getter =
           ElasticsearchEnumerators.getter(fields);
-      return Linq4j.asEnumerable(result.searchHits().hits()).select(getter);
+      return Linq4j.asEnumerable(search.searchHits().hits()).select(getter);
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
   }
 
-  private ElasticsearchSearchResult httpRequest(String query) throws 
IOException {
+  private Enumerable<Object> aggregate(List<String> ops,
+      List<Map.Entry<String, Class>> fields,
+      List<Map.Entry<String, RelFieldCollation.Direction>> sort,
+      List<String> groupBy,
+      List<Map.Entry<String, String>> aggregations,
+      Long offset, Long fetch) throws IOException {
+
+    if (aggregations.isEmpty()) {
+      throw new IllegalArgumentException("Missing Aggregations");
+    }
+
+    if (!groupBy.isEmpty() && offset != null) {
+      String message = "Currently ES doesn't support generic pagination "
+          + "with aggregations. You can still use LIMIT keyword (without 
OFFSET). "
+          + "For more details see 
https://github.com/elastic/elasticsearch/issues/4915";;
+      throw new IllegalStateException(message);
+    }
+
+    final ObjectNode query = mapper.createObjectNode();
+
+    // manually parse into JSON from previously concatenated strings
+    query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", ", 
", "") + "}"));
+
+    // remove / override attributes which are not applicable to aggregations
+    query.put("_source", false);
+    query.put("size", 0);
+    query.remove("script_fields");
+
+    // allows to detect aggregation for count(*)
+    final Predicate<Map.Entry<String, String>> isCountStar = e -> e.getValue()
+            .contains("\"" + ElasticsearchConstants.ID + "\"");
+
+    // list of expressions which are count(*)
+    final Set<String> countAll = aggregations.stream()
+            .filter(isCountStar)
+        .map(Map.Entry::getKey).collect(Collectors.toSet());
+
+    final Map<String, String> fieldMap = new HashMap<>();
+
+    // due to ES aggregation format. fields in "order by" clause should go 
first
+    // if "order by" is missing. order in "group by" is un-important
+    final Set<String> orderedGroupBy = new LinkedHashSet<>();
+    
orderedGroupBy.addAll(sort.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
+    orderedGroupBy.addAll(groupBy);
+
+    // construct nested aggregations node(s)
+    ObjectNode parent = query.with(AGGREGATIONS);
+    for (String name: orderedGroupBy) {
+      final String aggName = "g_" + name;
+      fieldMap.put(aggName, name);
+
+      final ObjectNode section = parent.with(aggName);
+      final ObjectNode terms = section.with("terms");
+      terms.put("field", name);
+      terms.set("missing", ElasticsearchJson.MISSING_VALUE); // expose missing 
terms
+
+      if (fetch != null) {
+        terms.put("size", fetch);
+      }
+
+      sort.stream().filter(e -> e.getKey().equals(name)).findAny().ifPresent(s 
-> {
+        terms.with("order").put("_key", s.getValue().isDescending() ? "desc" : 
"asc");
+      });
+
+      parent = section.with(AGGREGATIONS);
+    }
+
+    // simple version for queries like "select count(*), max(col1) from table" 
(no GROUP BY cols)
+    if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) {
+      for (Map.Entry<String, String> aggregation : aggregations) {
+        JsonNode value = mapper.readTree("{" + aggregation.getValue()  + "}");
+        parent.set(aggregation.getKey(), value);
+      }
+    }
+
+    // cleanup query. remove empty AGGREGATIONS element (if empty)
+    JsonNode agg = query;
+    while (agg.has(AGGREGATIONS) && 
agg.get(AGGREGATIONS).elements().hasNext()) {
+      agg = agg.get(AGGREGATIONS);
+    }
+    ((ObjectNode) agg).remove(AGGREGATIONS);
+
+    ElasticsearchJson.Result res = httpRequest(query);
+
+    final List<Map<String, Object>> result = new ArrayList<>();
+    if (res.aggregations() != null) {
+      // collect values
+      ElasticsearchJson.visitValueNodes(res.aggregations(), m -> {
+        Map<String, Object> newMap = new LinkedHashMap<>();
+        for (String key: m.keySet()) {
+          newMap.put(fieldMap.getOrDefault(key, key), m.get(key));
+        }
+        result.add(newMap);
+      });
+    } else {
+      // probably no group by. add single result
+      result.add(new LinkedHashMap<>());
+    }
+
+    // elastic exposes total number of documents matching a query in 
"/hits/total" path
+    // this can be used for simple "select count(*) from table"
+    final long total = res.searchHits().total();
+
+    if (groupBy.isEmpty()) {
+      // put totals automatically for count(*) expression(s), unless they 
contain group by
+      for (String expr : countAll) {
+        result.forEach(m -> m.put(expr, total));
+      }
+    }
+
+    final Function1<ElasticsearchJson.SearchHit, Object> getter =
+        ElasticsearchEnumerators.getter(fields);
+
+    ElasticsearchJson.SearchHits hits =
+        new ElasticsearchJson.SearchHits(total, result.stream()
+            .map(r -> new ElasticsearchJson.SearchHit("_id", r, null))
+            .collect(Collectors.toList()));
+
+    return Linq4j.asEnumerable(hits.hits()).select(getter);
+  }
+
+  private ElasticsearchJson.Result httpRequest(ObjectNode query) throws 
IOException {
     Objects.requireNonNull(query, "query");
     String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, 
typeName);
-    HttpEntity entity = new StringEntity(query, ContentType.APPLICATION_JSON);
+
+    Hook.QUERY_PLAN.run(query);
+    final String json = mapper.writeValueAsString(query);
+
+    LOGGER.debug("Elasticsearch Query: {}", json);
+
+    HttpEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
     Response response = restClient.performRequest("POST", uri, 
Collections.emptyMap(), entity);
     if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
       final String error = EntityUtils.toString(response.getEntity());
@@ -128,9 +341,75 @@ public class ElasticsearchTable extends 
AbstractElasticsearchTable {
     }
 
     try (InputStream is = response.getEntity().getContent()) {
-      return mapper.readValue(is, ElasticsearchSearchResult.class);
+      return mapper.readValue(is, ElasticsearchJson.Result.class);
     }
   }
+
+  @Override public RelDataType getRowType(RelDataTypeFactory 
relDataTypeFactory) {
+    final RelDataType mapType = relDataTypeFactory.createMapType(
+        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+        relDataTypeFactory.createTypeWithNullability(
+            relDataTypeFactory.createSqlType(SqlTypeName.ANY),
+            true));
+    return relDataTypeFactory.builder().add("_MAP", mapType).build();
+  }
+
+  @Override public String toString() {
+    return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
+  }
+
+  @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, 
SchemaPlus schema,
+      String tableName) {
+    return new ElasticsearchQueryable<>(queryProvider, schema, this, 
tableName);
+  }
+
+  @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
+    final RelOptCluster cluster = context.getCluster();
+    return new ElasticsearchTableScan(cluster, 
cluster.traitSetOf(ElasticsearchRel.CONVENTION),
+        relOptTable, this, null);
+  }
+
+  /**
+   * Implementation of {@link Queryable} based on
+   * a {@link ElasticsearchTable}.
+   *
+   * @param <T> element type
+   */
+  public static class ElasticsearchQueryable<T> extends 
AbstractTableQueryable<T> {
+    ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
+        ElasticsearchTable table, String tableName) {
+      super(queryProvider, schema, table, tableName);
+    }
+
+    public Enumerator<T> enumerator() {
+      return null;
+    }
+
+    private ElasticsearchTable getTable() {
+      return (ElasticsearchTable) table;
+    }
+
+    /** Called via code-generation.
+     * @param ops list of queries (as strings)
+     * @param fields projection
+     * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
+     * @return result as enumerable
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public Enumerable<Object> find(List<String> ops,
+         List<Map.Entry<String, Class>> fields,
+         List<Map.Entry<String, RelFieldCollation.Direction>> sort,
+         List<String> groupBy,
+         List<Map.Entry<String, String>> aggregations,
+         Long offset, Long fetch) {
+      try {
+        return getTable().find(ops, fields, sort, groupBy, aggregations, 
offset, fetch);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to query " + 
getTable().indexName, e);
+      }
+    }
+
+  }
 }
 
 // End ElasticsearchTable.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
index 7795ad3..3dd041a 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
@@ -37,7 +37,7 @@ import java.util.Objects;
  * using the "find" method.</p>
  */
 public class ElasticsearchTableScan extends TableScan implements 
ElasticsearchRel {
-  private final AbstractElasticsearchTable elasticsearchTable;
+  private final ElasticsearchTable elasticsearchTable;
   private final RelDataType projectRowType;
 
   /**
@@ -50,10 +50,10 @@ public class ElasticsearchTableScan extends TableScan 
implements ElasticsearchRe
    * @param projectRowType Fields and types to project; null to project raw row
    */
   ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet,
-       RelOptTable table, AbstractElasticsearchTable elasticsearchTable,
+       RelOptTable table, ElasticsearchTable elasticsearchTable,
        RelDataType projectRowType) {
     super(cluster, traitSet, table);
-    this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable);
+    this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable, 
"elasticsearchTable");
     this.projectRowType = projectRowType;
 
     assert getConvention() == ElasticsearchRel.CONVENTION;

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
index 51a2bd5..5e788a8 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -30,12 +30,10 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterImpl;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.Pair;
 
@@ -60,15 +58,15 @@ public class ElasticsearchToEnumerableConverter extends 
ConverterImpl implements
     return super.computeSelfCost(planner, mq).multiplyBy(.1);
   }
 
-  @Override public Result implement(EnumerableRelImplementor implementor, 
Prefer prefer) {
-    final BlockBuilder list = new BlockBuilder();
-    final ElasticsearchRel.Implementor elasticsearchImplementor =
-        new ElasticsearchRel.Implementor();
-    elasticsearchImplementor.visitChild(0, getInput());
+  @Override public Result implement(EnumerableRelImplementor relImplementor, 
Prefer prefer) {
+    final BlockBuilder block = new BlockBuilder();
+    final ElasticsearchRel.Implementor implementor = new 
ElasticsearchRel.Implementor();
+    implementor.visitChild(0, getInput());
+
     final RelDataType rowType = getRowType();
-    final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), 
rowType,
+    final PhysType physType = PhysTypeImpl.of(relImplementor.getTypeFactory(), 
rowType,
         prefer.prefer(JavaRowFormat.ARRAY));
-    final Expression fields = list.append("fields",
+    final Expression fields = block.append("fields",
         constantArrayList(
             Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType),
                 new AbstractList<Class>() {
@@ -81,20 +79,24 @@ public class ElasticsearchToEnumerableConverter extends 
ConverterImpl implements
                   }
                 }),
             Pair.class));
-    final Expression table = list.append("table",
-        elasticsearchImplementor.table
-            
.getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class));
-    List<String> opList = elasticsearchImplementor.list;
-    final Expression ops = list.append("ops", constantArrayList(opList, 
String.class));
-    Expression enumerable = list.append("enumerable",
+    final Expression table = block.append("table",
+        implementor.table
+            .getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
+    List<String> opList = implementor.list;
+    final Expression ops = block.append("ops", constantArrayList(opList, 
String.class));
+    final Expression sort = block.append("sort", 
constantArrayList(implementor.sort, Pair.class));
+    final Expression groupBy = block.append("groupBy", 
Expressions.constant(implementor.groupBy));
+    final Expression aggregations = block.append("aggregations",
+        constantArrayList(implementor.aggregations, Pair.class));
+
+    final Expression offset = block.append("offset", 
Expressions.constant(implementor.offset));
+    final Expression fetch = block.append("fetch", 
Expressions.constant(implementor.fetch));
+
+    Expression enumerable = block.append("enumerable",
         Expressions.call(table, 
ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
-            fields));
-    if (CalcitePrepareImpl.DEBUG) {
-      System.out.println("Elasticsearch: " + opList);
-    }
-    Hook.QUERY_PLAN.run(opList);
-    list.add(Expressions.return_(null, enumerable));
-    return implementor.result(physType, list.toBlock());
+            fields, sort, groupBy, aggregations, offset, fetch));
+    block.add(Expressions.return_(null, enumerable));
+    return relImplementor.result(physType, block.toBlock());
   }
 
   /** E.g. {@code constantArrayList("x", "y")} returns

http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
index 97f7943..a866fe4 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
@@ -78,6 +78,16 @@ class PredicateAnalyzer {
     }
   }
 
+  /**
+   * Thrown when {@link org.apache.calcite.rel.RelNode} expression can't be 
processed
+   * (or converted into ES query)
+   */
+  static class ExpressionNotAnalyzableException extends Exception {
+    ExpressionNotAnalyzableException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private PredicateAnalyzer() {}
 
   /**

Reply via email to