Thanks Andrei. One thing: now you’re a committer your commit messages should not (must not) end with “(Andrei Sereda)”.
Julian > On Sep 18, 2018, at 7:57 PM, [email protected] wrote: > > [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda) > > Aggregate functions (count/sum/min/max/avg) are pushed down to ES. > > Add ElasticsearchAggregate relational expression to convert SQL into native > Elastic aggregations (value_count, min, max etc.). > Enhance ElasticsearchTable to prepare correct aggregate ES JSON query. > > Create special classes to parse recursively elastic aggregation response or > buckets (located in ElasticJson). They're inspired from existing Elastic > high-level client source. > > For tests, make Json input more human friendly. Single quotes are accepted > and fields can be unquoted (unless > they contain special characters). Also field with dots 'a.b.c' are > automatically auto-expanded. This reduces JSON noise. > > Fix single projections which previously returned map (see [CALCITE-2485]) > > Close apache/calcite#801 > Close apache/calcite#822 > > > Project: http://git-wip-us.apache.org/repos/asf/calcite/repo > Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b > Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b > Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b > > Branch: refs/heads/master > Commit: 79af1c9ba735286653697deed3ff849b7c921fe4 > Parents: ce05146 > Author: Andrei Sereda <[email protected]> > Authored: Tue Sep 18 22:53:24 2018 -0400 > Committer: Andrei Sereda <[email protected]> > Committed: Tue Sep 18 22:53:24 2018 -0400 > > ---------------------------------------------------------------------- > elasticsearch/pom.xml | 6 + > .../AbstractElasticsearchTable.java | 150 ----- > .../elasticsearch/ElasticsearchAggregate.java | 165 +++++ > .../elasticsearch/ElasticsearchConstants.java | 9 - > .../elasticsearch/ElasticsearchEnumerators.java | 44 +- > .../elasticsearch/ElasticsearchFilter.java | 17 +- > .../elasticsearch/ElasticsearchJson.java | 614 +++++++++++++++++++ > .../elasticsearch/ElasticsearchMethod.java | 13 +- > .../elasticsearch/ElasticsearchProject.java | 6 +- > .../adapter/elasticsearch/ElasticsearchRel.java | 66 +- > .../elasticsearch/ElasticsearchRules.java | 38 +- > .../elasticsearch/ElasticsearchSchema.java | 30 +- > .../elasticsearch/ElasticsearchSort.java | 41 +- > .../elasticsearch/ElasticsearchTable.java | 313 +++++++++- > .../elasticsearch/ElasticsearchTableScan.java | 6 +- > .../ElasticsearchToEnumerableConverter.java | 46 +- > .../elasticsearch/PredicateAnalyzer.java | 10 + > .../adapter/elasticsearch/QueryBuilders.java | 106 +++- > .../adapter/elasticsearch/AggregationTest.java | 235 +++++++ > .../adapter/elasticsearch/BooleanLogicTest.java | 1 + > .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++--- > .../elasticsearch/ElasticsearchJsonTest.java | 183 ++++++ > .../EmbeddedElasticsearchPolicy.java | 41 +- > .../adapter/elasticsearch/Projection2Test.java | 107 ++++ > .../adapter/elasticsearch/ProjectionTest.java | 37 +- > .../elasticsearch/QueryBuildersTest.java | 63 ++ > .../calcite/test/ElasticsearchChecker.java | 90 ++- > 27 files changed, 2340 insertions(+), 406 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml > ---------------------------------------------------------------------- > diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml > index e3a044d..4700fee 100644 > --- a/elasticsearch/pom.xml > +++ b/elasticsearch/pom.xml > @@ -124,6 +124,12 @@ limitations under the License. > <scope>test</scope> > </dependency> > <dependency> > + <groupId>org.hamcrest</groupId> > + <artifactId>hamcrest-core</artifactId> > + <version>${hamcrest.version}</version> > + <scope>test</scope> > + </dependency> > + <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-api</artifactId> > </dependency> > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > deleted file mode 100644 > index 1a0f6d0..0000000 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > +++ /dev/null > @@ -1,150 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to you under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > -package org.apache.calcite.adapter.elasticsearch; > - > -import org.apache.calcite.adapter.java.AbstractQueryableTable; > -import org.apache.calcite.linq4j.Enumerable; > -import org.apache.calcite.linq4j.Enumerator; > -import org.apache.calcite.linq4j.QueryProvider; > -import org.apache.calcite.linq4j.Queryable; > -import org.apache.calcite.plan.RelOptCluster; > -import org.apache.calcite.plan.RelOptTable; > -import org.apache.calcite.rel.RelNode; > -import org.apache.calcite.rel.type.RelDataType; > -import org.apache.calcite.rel.type.RelDataTypeFactory; > -import org.apache.calcite.schema.SchemaPlus; > -import org.apache.calcite.schema.TranslatableTable; > -import org.apache.calcite.schema.impl.AbstractTableQueryable; > -import org.apache.calcite.sql.type.SqlTypeName; > - > -import com.fasterxml.jackson.databind.ObjectMapper; > - > -import java.util.List; > -import java.util.Map; > -import java.util.Objects; > - > -/** > - * Table based on an Elasticsearch type. > - */ > -abstract class AbstractElasticsearchTable extends AbstractQueryableTable > - implements TranslatableTable { > - > - final String indexName; > - final String typeName; > - final ObjectMapper mapper; > - > - /** > - * Creates an ElasticsearchTable. > - * @param indexName Elastic Search index > - * @param typeName Elastic Search index type > - * @param mapper Jackson API to parse (and created) JSON documents > - */ > - AbstractElasticsearchTable(String indexName, String typeName, ObjectMapper > mapper) { > - super(Object[].class); > - this.indexName = Objects.requireNonNull(indexName, "indexName"); > - this.typeName = Objects.requireNonNull(typeName, "typeName"); > - this.mapper = Objects.requireNonNull(mapper, "mapper"); > - } > - > - @Override public String toString() { > - return "ElasticsearchTable{" + indexName + "/" + typeName + "}"; > - } > - > - public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { > - final RelDataType mapType = relDataTypeFactory.createMapType( > - relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR), > - relDataTypeFactory.createTypeWithNullability( > - relDataTypeFactory.createSqlType(SqlTypeName.ANY), > - true)); > - return relDataTypeFactory.builder().add("_MAP", mapType).build(); > - } > - > - public <T> Queryable<T> asQueryable(QueryProvider queryProvider, > SchemaPlus schema, > - String tableName) { > - return new ElasticsearchQueryable<>(queryProvider, schema, this, > tableName); > - } > - > - public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable > relOptTable) { > - final RelOptCluster cluster = context.getCluster(); > - return new ElasticsearchTableScan(cluster, > cluster.traitSetOf(ElasticsearchRel.CONVENTION), > - relOptTable, this, null); > - } > - > - /** > - * In ES 5.x scripted fields start with {@code params._source.foo} while > in ES2.x > - * {@code _source.foo}. Helper method to build correct query based on > runtime version of elastic. > - * Used to keep backwards compatibility with ES2. > - * > - * @see <a > href="https://github.com/elastic/elasticsearch/issues/20068">_source > variable</a> > - * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted > Fields</a> > - * @return string to be used for scripted fields > - */ > - protected abstract String scriptedFieldPrefix(); > - > - /** Executes a "find" operation on the underlying type. > - * > - * <p>For example, > - * <code>client.prepareSearch(index).setTypes(type) > - * .setSource("{\"fields\" : [\"state\"]}")</code></p> > - * > - * @param index Elasticsearch index > - * @param ops List of operations represented as Json strings. > - * @param fields List of fields to project; or null to return map > - * @return Enumerator of results > - */ > - protected abstract Enumerable<Object> find(String index, List<String> ops, > - List<Map.Entry<String, Class>> fields); > - > - /** > - * Implementation of {@link Queryable} based on > - * a {@link AbstractElasticsearchTable}. > - * > - * @param <T> element type > - */ > - public static class ElasticsearchQueryable<T> extends > AbstractTableQueryable<T> { > - ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema, > - AbstractElasticsearchTable table, String tableName) { > - super(queryProvider, schema, table, tableName); > - } > - > - public Enumerator<T> enumerator() { > - return null; > - } > - > - private String getIndex() { > - return schema.unwrap(ElasticsearchSchema.class).getIndex(); > - } > - > - private AbstractElasticsearchTable getTable() { > - return (AbstractElasticsearchTable) table; > - } > - > - /** Called via code-generation. > - * @param ops list of queries (as strings) > - * @param fields projection > - * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND > - * @return result as enumerable > - */ > - @SuppressWarnings("UnusedDeclaration") > - public Enumerable<Object> find(List<String> ops, > - List<Map.Entry<String, Class>> fields) { > - return getTable().find(getIndex(), ops, fields); > - } > - } > -} > - > -// End AbstractElasticsearchTable.java > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > new file mode 100644 > index 0000000..9627aca > --- /dev/null > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > @@ -0,0 +1,165 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to you under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.calcite.adapter.elasticsearch; > + > +import org.apache.calcite.plan.RelOptCluster; > +import org.apache.calcite.plan.RelOptCost; > +import org.apache.calcite.plan.RelOptPlanner; > +import org.apache.calcite.plan.RelTraitSet; > +import org.apache.calcite.rel.InvalidRelException; > +import org.apache.calcite.rel.RelNode; > +import org.apache.calcite.rel.core.Aggregate; > +import org.apache.calcite.rel.core.AggregateCall; > +import org.apache.calcite.rel.metadata.RelMetadataQuery; > +import org.apache.calcite.rel.type.RelDataType; > +import org.apache.calcite.rel.type.RelDataTypeField; > +import org.apache.calcite.sql.SqlKind; > +import org.apache.calcite.util.ImmutableBitSet; > + > +import java.util.ArrayList; > +import java.util.EnumSet; > +import java.util.List; > +import java.util.Locale; > +import java.util.Set; > + > +/** > + * Implementation of > + * {@link org.apache.calcite.rel.core.Aggregate} relational expression > + * for ElasticSearch. > + */ > +public class ElasticsearchAggregate extends Aggregate implements > ElasticsearchRel { > + > + private static final Set<SqlKind> SUPPORTED_AGGREGATIONS = > + EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, > SqlKind.SUM); > + > + /** Creates a ElasticsearchAggregate */ > + ElasticsearchAggregate(RelOptCluster cluster, > + RelTraitSet traitSet, > + RelNode input, > + boolean indicator, > + ImmutableBitSet groupSet, > + List<ImmutableBitSet> groupSets, > + List<AggregateCall> aggCalls) throws InvalidRelException { > + super(cluster, traitSet, input, indicator, groupSet, groupSets, > aggCalls); > + > + if (getConvention() != input.getConvention()) { > + String message = String.format(Locale.ROOT, "%s != %s", > getConvention(), > + input.getConvention()); > + throw new AssertionError(message); > + } > + > + assert getConvention() == input.getConvention(); > + assert getConvention() == ElasticsearchRel.CONVENTION; > + assert this.groupSets.size() == 1 : "Grouping sets not supported"; > + > + for (AggregateCall aggCall : aggCalls) { > + if (aggCall.isDistinct()) { > + throw new InvalidRelException("distinct aggregation not supported"); > + } > + > + SqlKind kind = aggCall.getAggregation().getKind(); > + if (!SUPPORTED_AGGREGATIONS.contains(kind)) { > + final String message = String.format(Locale.ROOT, > + "Aggregation %s not supported (use one of %s)", kind, > SUPPORTED_AGGREGATIONS); > + throw new InvalidRelException(message); > + } > + } > + > + if (getGroupType() != Group.SIMPLE) { > + final String message = String.format(Locale.ROOT, "Only %s grouping is > supported. " > + + "Yours is %s", Group.SIMPLE, getGroupType()); > + throw new InvalidRelException(message); > + } > + > + } > + > + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, > boolean indicator, > + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, > + List<AggregateCall> aggCalls) { > + try { > + return new ElasticsearchAggregate(getCluster(), traitSet, input, > + indicator, groupSet, groupSets, > + aggCalls); > + } catch (InvalidRelException e) { > + throw new AssertionError(e); > + } > + } > + > + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, > RelMetadataQuery mq) { > + return super.computeSelfCost(planner, mq).multiplyBy(0.1); > + } > + > + @Override public void implement(Implementor implementor) { > + implementor.visitChild(0, getInput()); > + List<String> inputFields = fieldNames(getInput().getRowType()); > + > + for (int group : groupSet) { > + implementor.addGroupBy(inputFields.get(group)); > + } > + > + for (AggregateCall aggCall : aggCalls) { > + List<String> names = new ArrayList<>(); > + for (int i : aggCall.getArgList()) { > + names.add(inputFields.get(i)); > + } > + > + final String name = names.isEmpty() ? ElasticsearchConstants.ID : > names.get(0); > + > + String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}", > + toElasticAggregate(aggCall), > + name); > + > + implementor.addAggregation(aggCall.getName(), op); > + } > + } > + > + /** > + * Most of the aggregations can be retrieved with single > + * <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html">stats</a> > + * function. But currently only one-to-one mapping is supported between > sql agg and elastic > + * aggregation. > + */ > + private String toElasticAggregate(AggregateCall call) { > + SqlKind kind = call.getAggregation().getKind(); > + switch (kind) { > + case COUNT: > + return call.isApproximate() ? "cardinality" : "value_count"; > + case SUM: > + return "sum"; > + case MIN: > + return "min"; > + case MAX: > + return "max"; > + case AVG: > + return "avg"; > + default: > + throw new IllegalArgumentException("Unknown aggregation kind " + kind > + " for " + call); > + } > + } > + > + private List<String> fieldNames(RelDataType relDataType) { > + List<String> names = new ArrayList<>(); > + > + for (RelDataTypeField rdtf : relDataType.getFieldList()) { > + names.add(rdtf.getName()); > + } > + return names; > + } > + > +} > + > +// End ElasticsearchAggregate.java > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > index ed628cc..2c4c42c 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > @@ -30,18 +30,9 @@ interface ElasticsearchConstants { > String FIELDS = "fields"; > String SOURCE_PAINLESS = "params._source"; > String SOURCE_GROOVY = "_source"; > - String SOURCE = SOURCE_GROOVY; > String ID = "_id"; > String UID = "_uid"; > > - /* Aggregation pushdown operations supported */ > - String AGG_SUM = "SUM"; > - String AGG_SUM0 = "$SUM0"; > - String AGG_COUNT = "COUNT"; > - String AGG_MIN = "MIN"; > - String AGG_MAX = "MAX"; > - String AGG_AVG = "AVG"; > - > Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX); > > } > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > index d87de7e..16ac92d 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > @@ -26,27 +26,27 @@ import java.util.Map; > > /** > * Util functions which convert > - * {@link > org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit} > + * {@link ElasticsearchJson.SearchHit} > * into calcite specific return type (map, object[], list etc.) > */ > class ElasticsearchEnumerators { > > private ElasticsearchEnumerators() {} > > - private static Function1<ElasticsearchSearchResult.SearchHit, Map> > mapGetter() { > - return new Function1<ElasticsearchSearchResult.SearchHit, Map>() { > - public Map apply(ElasticsearchSearchResult.SearchHit hits) { > + private static Function1<ElasticsearchJson.SearchHit, Map> mapGetter() { > + return new Function1<ElasticsearchJson.SearchHit, Map>() { > + public Map apply(ElasticsearchJson.SearchHit hits) { > return hits.sourceOrFields(); > } > }; > } > > - private static Function1<ElasticsearchSearchResult.SearchHit, Object> > singletonGetter( > + private static Function1<ElasticsearchJson.SearchHit, Object> > singletonGetter( > final String fieldName, > final Class fieldClass) { > - return new Function1<ElasticsearchSearchResult.SearchHit, Object>() { > - public Object apply(ElasticsearchSearchResult.SearchHit hits) { > - return convert(hits.sourceOrFields(), fieldClass); > + return new Function1<ElasticsearchJson.SearchHit, Object>() { > + public Object apply(ElasticsearchJson.SearchHit hits) { > + return convert(hits.valueOrNull(fieldName), fieldClass); > } > }; > } > @@ -59,30 +59,38 @@ class ElasticsearchEnumerators { > * > * @return function that converts the search result into a generic array > */ > - private static Function1<ElasticsearchSearchResult.SearchHit, Object[]> > listGetter( > + private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter( > final List<Map.Entry<String, Class>> fields) { > - return new Function1<ElasticsearchSearchResult.SearchHit, Object[]>() { > - public Object[] apply(ElasticsearchSearchResult.SearchHit hit) { > + return new Function1<ElasticsearchJson.SearchHit, Object[]>() { > + public Object[] apply(ElasticsearchJson.SearchHit hit) { > Object[] objects = new Object[fields.size()]; > for (int i = 0; i < fields.size(); i++) { > final Map.Entry<String, Class> field = fields.get(i); > final String name = field.getKey(); > final Class type = field.getValue(); > - objects[i] = convert(hit.value(name), type); > + objects[i] = convert(hit.valueOrNull(name), type); > } > return objects; > } > }; > } > > - static Function1<ElasticsearchSearchResult.SearchHit, Object> getter( > + static Function1<ElasticsearchJson.SearchHit, Object> getter( > List<Map.Entry<String, Class>> fields) { > //noinspection unchecked > - return fields == null > - ? (Function1) mapGetter() > - : fields.size() == 1 > - ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue()) > - : (Function1) listGetter(fields); > + final Function1 getter; > + if (fields == null || fields.size() == 1 && > "_MAP".equals(fields.get(0).getKey())) { > + // select * from table > + getter = mapGetter(); > + } else if (fields.size() == 1) { > + // select foo from table > + getter = singletonGetter(fields.get(0).getKey(), > fields.get(0).getValue()); > + } else { > + // select a, b, c from table > + getter = listGetter(fields); > + } > + > + return getter; > } > > private static Object convert(Object o, Class clazz) { > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > index 4d187b1..c339671 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil; > import org.apache.calcite.plan.RelTraitSet; > import org.apache.calcite.rel.RelNode; > import org.apache.calcite.rel.core.Filter; > -import org.apache.calcite.rel.core.Project; > import org.apache.calcite.rel.metadata.RelMetadataQuery; > import org.apache.calcite.rex.RexCall; > import org.apache.calcite.rex.RexInputRef; > @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter > implements ElasticsearchRel { > > @Override public void implement(Implementor implementor) { > implementor.visitChild(0, getInput()); > - List<String> fieldNames; > - if (input instanceof Project) { > - final List<RexNode> projects = ((Project) input).getProjects(); > - fieldNames = new ArrayList<>(projects.size()); > - for (RexNode project : projects) { > - String name = project.accept(MapProjectionFieldVisitor.INSTANCE); > - fieldNames.add(name); > - } > - } else { > - fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType()); > - } > ObjectMapper mapper = implementor.elasticsearchTable.mapper; > PredicateAnalyzerTranslator translator = new > PredicateAnalyzerTranslator(mapper); > try { > implementor.add(translator.translateMatch(condition)); > } catch (IOException e) { > throw new UncheckedIOException(e); > - } catch (ExpressionNotAnalyzableException e) { > + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) { > throw new RuntimeException(e); > } > } > @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter implements > ElasticsearchRel { > this.mapper = Objects.requireNonNull(mapper, "mapper"); > } > > - String translateMatch(RexNode condition) throws IOException, > ExpressionNotAnalyzableException { > + String translateMatch(RexNode condition) throws IOException, > + PredicateAnalyzer.ExpressionNotAnalyzableException { > > StringWriter writer = new StringWriter(); > JsonGenerator generator = mapper.getFactory().createGenerator(writer); > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > new file mode 100644 > index 0000000..7c80e82 > --- /dev/null > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > @@ -0,0 +1,614 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to you under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.calcite.adapter.elasticsearch; > + > +import com.fasterxml.jackson.annotation.JsonCreator; > +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; > +import com.fasterxml.jackson.annotation.JsonProperty; > +import com.fasterxml.jackson.core.JsonParser; > +import com.fasterxml.jackson.core.JsonProcessingException; > +import com.fasterxml.jackson.databind.DeserializationContext; > +import com.fasterxml.jackson.databind.JsonNode; > +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; > +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; > +import com.fasterxml.jackson.databind.node.ArrayNode; > +import com.fasterxml.jackson.databind.node.JsonNodeFactory; > +import com.fasterxml.jackson.databind.node.ObjectNode; > + > +import java.io.IOException; > +import java.time.Duration; > +import java.util.ArrayList; > +import java.util.Arrays; > +import java.util.Collections; > +import java.util.HashSet; > +import java.util.Iterator; > +import java.util.LinkedHashMap; > +import java.util.List; > +import java.util.Locale; > +import java.util.Map; > +import java.util.Objects; > +import java.util.Set; > +import java.util.function.BiConsumer; > +import java.util.function.Consumer; > +import java.util.stream.StreamSupport; > + > +import static java.util.Collections.unmodifiableMap; > + > +/** > + * Internal objects (and deserializers) used to parse elastic search results > + * (which are in JSON format). > + * > + * <p>Since we're using basic row-level rest client http response has to be > + * processed manually using JSON (jackson) library. > + */ > +class ElasticsearchJson { > + > + /** > + * Used as special aggregation key for missing values (documents which are > missing a field). > + * Buckets with that value are then converted to {@code null}s in flat > tabular format. > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing > Value</a> > + */ > + static final JsonNode MISSING_VALUE = > JsonNodeFactory.instance.textNode("__MISSING__"); > + > + private ElasticsearchJson() {} > + > + /** > + * Visits leaves of the aggregation where all values are stored. > + */ > + static void visitValueNodes(Aggregations aggregations, > Consumer<Map<String, Object>> consumer) { > + Objects.requireNonNull(aggregations, "aggregations"); > + Objects.requireNonNull(consumer, "consumer"); > + > + List<Bucket> buckets = new ArrayList<>(); > + > + Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>(); > + > + BiConsumer<RowKey, MultiValue> cons = (r, v) -> > + rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v); > + aggregations.forEach(a -> visitValueNodes(a, buckets, cons)); > + rows.forEach((k, v) -> { > + Map<String, Object> row = new LinkedHashMap<>(k.keys); > + v.forEach(val -> row.put(val.getName(), val.value())); > + consumer.accept(row); > + }); > + } > + > + /** > + * Identifies a calcite row (as in relational algebra) > + */ > + private static class RowKey { > + private final Map<String, Object> keys; > + private final int hashCode; > + > + private RowKey(final Map<String, Object> keys) { > + this.keys = Objects.requireNonNull(keys, "keys"); > + this.hashCode = Objects.hashCode(keys); > + } > + > + private RowKey(List<Bucket> buckets) { > + this(toMap(buckets)); > + } > + > + private static Map<String, Object> toMap(Iterable<Bucket> buckets) { > + return StreamSupport.stream(buckets.spliterator(), false) > + .collect(LinkedHashMap::new, > + (m, v) -> m.put(v.getName(), v.key()), > + LinkedHashMap::putAll); > + } > + > + @Override public boolean equals(final Object o) { > + if (this == o) { > + return true; > + } > + if (o == null || getClass() != o.getClass()) { > + return false; > + } > + final RowKey rowKey = (RowKey) o; > + return hashCode == rowKey.hashCode > + && Objects.equals(keys, rowKey.keys); > + } > + > + @Override public int hashCode() { > + return this.hashCode; > + } > + } > + > + private static void visitValueNodes(Aggregation aggregation, List<Bucket> > parents, > + BiConsumer<RowKey, MultiValue> consumer) { > + > + if (aggregation instanceof MultiValue) { > + // publish one value of the row > + RowKey key = new RowKey(parents); > + consumer.accept(key, (MultiValue) aggregation); > + return; > + } > + > + if (aggregation instanceof Bucket) { > + Bucket bucket = (Bucket) aggregation; > + parents.add(bucket); > + bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, > consumer)); > + parents.remove(parents.size() - 1); > + } else if (aggregation instanceof HasAggregations) { > + HasAggregations children = (HasAggregations) aggregation; > + children.getAggregations().forEach(a -> visitValueNodes(a, parents, > consumer)); > + } else if (aggregation instanceof MultiBucketsAggregation) { > + MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation; > + multi.buckets().forEach(b -> { > + parents.add(b); > + b.getAggregations().forEach(a -> visitValueNodes(a, parents, > consumer)); > + parents.remove(parents.size() - 1); > + }); > + } > + > + } > + > + /** > + * Response from Elastic > + */ > + @JsonIgnoreProperties(ignoreUnknown = true) > + static class Result { > + private final SearchHits hits; > + private final Aggregations aggregations; > + private final long took; > + > + /** > + * Constructor for this instance. > + * @param hits list of matched documents > + * @param took time taken (in took) for this query to execute > + */ > + @JsonCreator > + Result(@JsonProperty("hits") SearchHits hits, > + @JsonProperty("aggregations") Aggregations aggregations, > + @JsonProperty("took") long took) { > + this.hits = Objects.requireNonNull(hits, "hits"); > + this.aggregations = aggregations; > + this.took = took; > + } > + > + SearchHits searchHits() { > + return hits; > + } > + > + Aggregations aggregations() { > + return aggregations; > + } > + > + public Duration took() { > + return Duration.ofMillis(took); > + } > + > + } > + > + /** > + * Similar to {@code SearchHits} in ES. Container for {@link SearchHit} > + */ > + @JsonIgnoreProperties(ignoreUnknown = true) > + static class SearchHits { > + > + private final long total; > + private final List<SearchHit> hits; > + > + @JsonCreator > + SearchHits(@JsonProperty("total")final long total, > + @JsonProperty("hits") final List<SearchHit> hits) { > + this.total = total; > + this.hits = Objects.requireNonNull(hits, "hits"); > + } > + > + public List<SearchHit> hits() { > + return this.hits; > + } > + > + public long total() { > + return total; > + } > + > + } > + > + /** > + * Concrete result record which matched the query. Similar to {@code > SearchHit} in ES. > + */ > + @JsonIgnoreProperties(ignoreUnknown = true) > + static class SearchHit { > + private final String id; > + private final Map<String, Object> source; > + private final Map<String, Object> fields; > + > + @JsonCreator > + SearchHit(@JsonProperty("_id") final String id, > + @JsonProperty("_source") final Map<String, Object> > source, > + @JsonProperty("fields") final Map<String, Object> > fields) { > + this.id = Objects.requireNonNull(id, "id"); > + > + // both can't be null > + if (source == null && fields == null) { > + final String message = String.format(Locale.ROOT, > + "Both '_source' and 'fields' are missing for %s", id); > + throw new IllegalArgumentException(message); > + } > + > + // both can't be non-null > + if (source != null && fields != null) { > + final String message = String.format(Locale.ROOT, > + "Both '_source' and 'fields' are populated (non-null) for %s", > id); > + throw new IllegalArgumentException(message); > + } > + > + this.source = source; > + this.fields = fields; > + } > + > + /** > + * Returns id of this hit (usually document id) > + * @return unique id > + */ > + public String id() { > + return id; > + } > + > + Object valueOrNull(String name) { > + Objects.requireNonNull(name, "name"); > + if (fields != null && fields.containsKey(name)) { > + Object field = fields.get(name); > + if (field instanceof Iterable) { > + // return first element (or null) > + Iterator<?> iter = ((Iterable<?>) field).iterator(); > + return iter.hasNext() ? iter.next() : null; > + } > + > + return field; > + } > + > + return valueFromPath(source, name); > + } > + > + /** > + * Returns property from nested maps given a path like {@code a.b.c}. > + * @param map current map > + * @param path field path(s), optionally with dots ({@code a.b.c}). > + * @return value located at path {@code path} or {@code null} if not > found. > + */ > + private static Object valueFromPath(Map<String, Object> map, String > path) { > + if (map == null) { > + return null; > + } > + > + if (map.containsKey(path)) { > + return map.get(path); > + } > + > + // maybe pattern of type a.b.c > + final int index = path.indexOf('.'); > + if (index == -1) { > + return null; > + } > + > + final String prefix = path.substring(0, index); > + final String suffix = path.substring(index + 1); > + > + Object maybeMap = map.get(prefix); > + if (maybeMap instanceof Map) { > + return valueFromPath((Map<String, Object>) maybeMap, suffix); > + } > + > + return null; > + } > + > + Map<String, Object> source() { > + return source; > + } > + > + Map<String, Object> fields() { > + return fields; > + } > + > + Map<String, Object> sourceOrFields() { > + return source != null ? source : fields; > + } > + } > + > + > + /** > + * {@link Aggregation} container. > + */ > + @JsonDeserialize(using = AggregationsDeserializer.class) > + static class Aggregations implements Iterable<Aggregation> { > + > + private final List<? extends Aggregation> aggregations; > + private Map<String, Aggregation> aggregationsAsMap; > + > + Aggregations(List<? extends Aggregation> aggregations) { > + this.aggregations = Objects.requireNonNull(aggregations, > "aggregations"); > + } > + > + /** > + * Iterates over the {@link Aggregation}s. > + */ > + @Override public final Iterator<Aggregation> iterator() { > + return asList().iterator(); > + } > + > + /** > + * The list of {@link Aggregation}s. > + */ > + final List<Aggregation> asList() { > + return Collections.unmodifiableList(aggregations); > + } > + > + /** > + * Returns the {@link Aggregation}s keyed by aggregation name. Lazy init. > + */ > + final Map<String, Aggregation> asMap() { > + if (aggregationsAsMap == null) { > + Map<String, Aggregation> map = new > LinkedHashMap<>(aggregations.size()); > + for (Aggregation aggregation : aggregations) { > + map.put(aggregation.getName(), aggregation); > + } > + this.aggregationsAsMap = unmodifiableMap(map); > + } > + return aggregationsAsMap; > + } > + > + /** > + * Returns the aggregation that is associated with the specified name. > + */ > + @SuppressWarnings("unchecked") > + public final <A extends Aggregation> A get(String name) { > + return (A) asMap().get(name); > + } > + > + @Override public final boolean equals(Object obj) { > + if (obj == null || getClass() != obj.getClass()) { > + return false; > + } > + return aggregations.equals(((Aggregations) obj).aggregations); > + } > + > + @Override public final int hashCode() { > + return Objects.hash(getClass(), aggregations); > + } > + > + } > + > + /** > + * Identifies all aggregations > + */ > + interface Aggregation { > + > + /** > + * @return The name of this aggregation. > + */ > + String getName(); > + > + } > + > + /** > + * Allows traversing aggregations tree > + */ > + interface HasAggregations { > + Aggregations getAggregations(); > + } > + > + /** > + * An aggregation that returns multiple buckets > + */ > + static class MultiBucketsAggregation implements Aggregation { > + > + private final String name; > + private final List<Bucket> buckets; > + > + MultiBucketsAggregation(final String name, > + final List<Bucket> buckets) { > + this.name = name; > + this.buckets = buckets; > + } > + > + /** > + * @return The buckets of this aggregation. > + */ > + List<Bucket> buckets() { > + return buckets; > + } > + > + @Override public String getName() { > + return name; > + } > + } > + > + /** > + * A bucket represents a criteria to which all documents that fall in it > adhere to. > + * It is also uniquely identified > + * by a key, and can potentially hold sub-aggregations computed over all > documents in it. > + */ > + static class Bucket implements HasAggregations, Aggregation { > + private final Object key; > + private final String name; > + private final Aggregations aggregations; > + > + Bucket(final Object key, > + final String name, > + final Aggregations aggregations) { > + this.key = key; // key can be set after construction > + this.name = Objects.requireNonNull(name, "name"); > + this.aggregations = Objects.requireNonNull(aggregations, > "aggregations"); > + } > + > + /** > + * @return The key associated with the bucket > + */ > + Object key() { > + return key; > + } > + > + /** > + * @return The key associated with the bucket as a string > + */ > + String keyAsString() { > + return Objects.toString(key()); > + } > + > + /** > + * @return The sub-aggregations of this bucket > + */ > + @Override public Aggregations getAggregations() { > + return aggregations; > + } > + > + @Override public String getName() { > + return name; > + } > + } > + > + /** > + * Multi value aggregatoin like > + * <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html">Stats</a> > + */ > + static class MultiValue implements Aggregation { > + private final String name; > + private final Map<String, Object> values; > + > + MultiValue(final String name, final Map<String, Object> values) { > + this.name = Objects.requireNonNull(name, "name"); > + this.values = Objects.requireNonNull(values, "values"); > + } > + > + @Override public String getName() { > + return name; > + } > + > + Map<String, Object> values() { > + return values; > + } > + > + /** > + * For single value. Returns single value represented by this leaf > aggregation. > + * @return value corresponding to {@code value} > + */ > + Object value() { > + if (!values().containsKey("value")) { > + throw new IllegalStateException("'value' field not present in this > aggregation"); > + } > + > + return values().get("value"); > + } > + > + } > + > + /** > + * Allows to de-serialize nested aggregation structures. > + */ > + static class AggregationsDeserializer extends > StdDeserializer<Aggregations> { > + > + private static final Set<String> IGNORE_TOKENS = new > HashSet<>(Arrays.asList("meta", > + "buckets", "value", "values", "value_as_string", "doc_count", "key", > "key_as_string")); > + > + AggregationsDeserializer() { > + super(Aggregations.class); > + } > + > + @Override public Aggregations deserialize(final JsonParser parser, > + final DeserializationContext ctxt) > + throws IOException { > + > + ObjectNode node = parser.getCodec().readTree(parser); > + return parseAggregations(parser, node); > + } > + > + private static Aggregations parseAggregations(JsonParser parser, > ObjectNode node) > + throws JsonProcessingException { > + > + List<Aggregation> aggregations = new ArrayList<>(); > + > + Iterable<Map.Entry<String, JsonNode>> iter = node::fields; > + for (Map.Entry<String, JsonNode> entry : iter) { > + final String name = entry.getKey(); > + final JsonNode value = entry.getValue(); > + > + Aggregation agg = null; > + if (value.has("buckets")) { > + agg = parseBuckets(parser, name, (ArrayNode) value.get("buckets")); > + } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) { > + // leaf > + agg = parseValue(parser, name, (ObjectNode) value); > + } > + > + if (agg != null) { > + aggregations.add(agg); > + } > + } > + > + return new Aggregations(aggregations); > + } > + > + > + > + private static MultiValue parseValue(JsonParser parser, String name, > ObjectNode node) > + throws JsonProcessingException { > + > + return new MultiValue(name, parser.getCodec().treeToValue(node, > Map.class)); > + } > + > + private static Aggregation parseBuckets(JsonParser parser, String name, > ArrayNode nodes) > + throws JsonProcessingException { > + > + List<Bucket> buckets = new ArrayList<>(nodes.size()); > + for (JsonNode b: nodes) { > + buckets.add(parseBucket(parser, name, (ObjectNode) b)); > + } > + > + return new MultiBucketsAggregation(name, buckets); > + } > + > + /** > + * Determines if current key is a missing field key. Missing key is > returned when document > + * does not have pivoting attribute (example {@code GROUP BY > _MAP['a.b.missing']}). It helps > + * grouping documents which don't have a field. In relational algebra > this > + * would be {@code null}. > + * > + * @param key current {@code key} (usually string) as returned by ES > + * @return {@code true} if this value > + * @see #MISSING_VALUE > + */ > + private static boolean isMissingBucket(JsonNode key) { > + return MISSING_VALUE.equals(key); > + } > + > + private static Bucket parseBucket(JsonParser parser, String name, > ObjectNode node) > + throws JsonProcessingException { > + > + final JsonNode keyNode = node.get("key"); > + final Object key; > + if (isMissingBucket(keyNode) || keyNode.isNull()) { > + key = null; > + } else if (keyNode.isTextual()) { > + key = keyNode.textValue(); > + } else if (keyNode.isNumber()) { > + key = keyNode.numberValue(); > + } else if (keyNode.isBoolean()) { > + key = keyNode.booleanValue(); > + } else { > + // don't usually expect keys to be Objects > + key = parser.getCodec().treeToValue(node, Map.class); > + } > + > + return new Bucket(key, name, parseAggregations(parser, node)); > + } > + > + } > +} > + > +// End ElasticsearchJson.java > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > index 72753e6..709156f 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > @@ -27,8 +27,17 @@ import java.util.List; > * Builtin methods in the Elasticsearch adapter. > */ > enum ElasticsearchMethod { > - > ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class, > - "find", List.class, List.class); > + > + > ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class, > + "find", > + List.class, // ops - projections and other stuff > + List.class, // fields > + List.class, // sort > + List.class, // groupBy > + List.class, // aggregations > + Long.class, // offset > + Long.class // fetch > + ); > > public final Method method; > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > index 7d5811c..d0841c3 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project > implements ElasticsearchRel { > query.append("\"script_fields\": {" + String.join(", ", scriptFields) + > "}"); > } > > - for (String opfield : implementor.list) { > - if (opfield.startsWith("\"_source\"")) { > - implementor.list.remove(opfield); > - } > - } > + implementor.list.removeIf(l -> l.startsWith("\"_source\"")); > implementor.add(query.toString()); > } > } > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > index 436adf9..1dad691 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch; > > import org.apache.calcite.plan.Convention; > import org.apache.calcite.plan.RelOptTable; > +import org.apache.calcite.rel.RelFieldCollation; > import org.apache.calcite.rel.RelNode; > +import org.apache.calcite.util.Pair; > > import java.util.ArrayList; > import java.util.List; > +import java.util.Map; > +import java.util.Objects; > > /** > * Relational expression that uses Elasticsearch calling convention. > @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode { > * {@link ElasticsearchRel} nodes into an Elasticsearch query. > */ > class Implementor { > + > final List<String> list = new ArrayList<>(); > > + /** > + * Sorting clauses. > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html">Sort</a> > + */ > + final List<Map.Entry<String, RelFieldCollation.Direction>> sort = new > ArrayList<>(); > + > + /** > + * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements > (functions). > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html">aggregations</a> > + */ > + final List<Map.Entry<String, String>> aggregations = new ArrayList<>(); > + > + /** > + * Allows bucketing documents together. Similar to {@code select ... > from table group by field1} > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html">Bucket > Aggregrations</a> > + */ > + final List<String> groupBy = new ArrayList<>(); > + > + /** > + * Starting index (default {@code 0}). Equivalent to {@code start} in ES > query. > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a> > + */ > + Long offset; > + > + /** > + * Number of records to return. Equivalent to {@code size} in ES query. > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a> > + */ > + Long fetch; > + > RelOptTable table; > - AbstractElasticsearchTable elasticsearchTable; > + ElasticsearchTable elasticsearchTable; > > - public void add(String findOp) { > + void add(String findOp) { > list.add(findOp); > } > > - public void visitChild(int ordinal, RelNode input) { > + void addGroupBy(String field) { > + Objects.requireNonNull(field, "field"); > + groupBy.add(field); > + } > + > + void addSort(String field, RelFieldCollation.Direction direction) { > + Objects.requireNonNull(field, "field"); > + sort.add(new Pair<>(field, direction)); > + } > + > + void addAggregation(String field, String expression) { > + Objects.requireNonNull(field, "field"); > + Objects.requireNonNull(expression, "expression"); > + aggregations.add(new Pair<>(field, expression)); > + } > + > + void offset(long offset) { > + this.offset = offset; > + } > + > + void fetch(long fetch) { > + this.fetch = fetch; > + } > + > + void visitChild(int ordinal, RelNode input) { > assert ordinal == 0; > ((ElasticsearchRel) input).implement(this); > } > + > } > } > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > index 97e934c..b442ddd 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention; > import org.apache.calcite.plan.RelOptRule; > import org.apache.calcite.plan.RelTrait; > import org.apache.calcite.plan.RelTraitSet; > +import org.apache.calcite.rel.InvalidRelException; > import org.apache.calcite.rel.RelCollations; > import org.apache.calcite.rel.RelNode; > import org.apache.calcite.rel.convert.ConverterRule; > import org.apache.calcite.rel.core.Sort; > +import org.apache.calcite.rel.logical.LogicalAggregate; > import org.apache.calcite.rel.logical.LogicalFilter; > import org.apache.calcite.rel.logical.LogicalProject; > import org.apache.calcite.rel.type.RelDataType; > @@ -53,7 +55,8 @@ class ElasticsearchRules { > static final RelOptRule[] RULES = { > ElasticsearchSortRule.INSTANCE, > ElasticsearchFilterRule.INSTANCE, > - ElasticsearchProjectRule.INSTANCE > + ElasticsearchProjectRule.INSTANCE, > + ElasticsearchAggregateRule.INSTANCE > }; > > private ElasticsearchRules() {} > @@ -147,7 +150,7 @@ class ElasticsearchRules { > } > } > throw new IllegalArgumentException("Translation of " + call.toString() > - + "is not supported by ElasticsearchProject"); > + + " is not supported by ElasticsearchProject"); > } > > List<String> visitList(List<RexNode> list) { > @@ -217,6 +220,37 @@ class ElasticsearchRules { > } > > /** > + * Rule to convert an {@link > org.apache.calcite.rel.logical.LogicalAggregate} > + * to an {@link ElasticsearchAggregate}. > + */ > + private static class ElasticsearchAggregateRule extends > ElasticsearchConverterRule { > + static final RelOptRule INSTANCE = new ElasticsearchAggregateRule(); > + > + private ElasticsearchAggregateRule() { > + super(LogicalAggregate.class, Convention.NONE, > ElasticsearchRel.CONVENTION, > + "ElasticsearchAggregateRule"); > + } > + > + public RelNode convert(RelNode rel) { > + final LogicalAggregate agg = (LogicalAggregate) rel; > + final RelTraitSet traitSet = agg.getTraitSet().replace(out); > + try { > + return new ElasticsearchAggregate( > + rel.getCluster(), > + traitSet, > + convert(agg.getInput(), traitSet.simplify()), > + agg.indicator, > + agg.getGroupSet(), > + agg.getGroupSets(), > + agg.getAggCallList()); > + } catch (InvalidRelException e) { > + return null; > + } > + } > + } > + > + > + /** > * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject} > * to an {@link ElasticsearchProject}. > */ > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > index 1c630ad..80a94be 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient; > import java.io.IOException; > import java.io.InputStream; > import java.io.UncheckedIOException; > +import java.util.Collections; > import java.util.Locale; > import java.util.Map; > import java.util.Objects; > @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends AbstractSchema { > > private final ObjectMapper mapper; > > + private final Map<String, Table> tableMap; > + > /** > * Allows schema to be instantiated from existing elastic search client. > * This constructor is used in tests. > @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends AbstractSchema { > * @param index name of ES index > */ > public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String > index) { > + this(client, mapper, index, null); > + } > + > + public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String > index, String type) { > super(); > this.client = Objects.requireNonNull(client, "client"); > this.mapper = Objects.requireNonNull(mapper, "mapper"); > this.index = Objects.requireNonNull(index, "index"); > + if (type == null) { > + try { > + this.tableMap = createTables(listTypesFromElastic()); > + } catch (IOException e) { > + throw new UncheckedIOException("Couldn't get types for " + index, e); > + } > + } else { > + this.tableMap = createTables(Collections.singleton(type)); > + } > } > > @Override protected Map<String, Table> getTableMap() { > + return tableMap; > + } > + > + private Map<String, Table> createTables(Iterable<String> types) { > final ImmutableMap.Builder<String, Table> builder = > ImmutableMap.builder(); > - try { > - for (String type: listTypes()) { > - builder.put(type, new ElasticsearchTable(client, mapper, index, > type)); > - } > - } catch (IOException e) { > - throw new UncheckedIOException("Failed to get types for " + index, e); > + for (String type : types) { > + builder.put(type, new ElasticsearchTable(client, mapper, index, type)); > } > return builder.build(); > } > @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends AbstractSchema { > * @throws IOException for any IO related issues > * @throws IllegalStateException if reply is not understood > */ > - private Set<String> listTypes() throws IOException { > + private Set<String> listTypesFromElastic() throws IOException { > final String endpoint = "/" + index + "/_mapping"; > final Response response = client.performRequest("GET", endpoint); > try (InputStream is = response.getEntity().getContent()) { > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > index ed669aa..9078b72 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet; > import org.apache.calcite.rel.RelCollation; > import org.apache.calcite.rel.RelFieldCollation; > import org.apache.calcite.rel.RelNode; > -import org.apache.calcite.rel.core.Project; > import org.apache.calcite.rel.core.Sort; > import org.apache.calcite.rel.metadata.RelMetadataQuery; > import org.apache.calcite.rel.type.RelDataTypeField; > import org.apache.calcite.rex.RexLiteral; > import org.apache.calcite.rex.RexNode; > -import org.apache.calcite.util.Util; > > -import java.util.ArrayList; > import java.util.List; > > /** > @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort implements > ElasticsearchRel { > > @Override public void implement(Implementor implementor) { > implementor.visitChild(0, getInput()); > - if (!collation.getFieldCollations().isEmpty()) { > - final List<String> keys = new ArrayList<>(); > - if (input instanceof Project) { > - final List<RexNode> projects = ((Project) input).getProjects(); > + final List<RelDataTypeField> fields = getRowType().getFieldList(); > > - for (RelFieldCollation fieldCollation : > collation.getFieldCollations()) { > - RexNode project = projects.get(fieldCollation.getFieldIndex()); > - String name = project.accept(MapProjectionFieldVisitor.INSTANCE); > - keys.add(ElasticsearchRules.quote(name) + ": " + > direction(fieldCollation)); > - } > - } else { > - final List<RelDataTypeField> fields = getRowType().getFieldList(); > - > - for (RelFieldCollation fieldCollation : > collation.getFieldCollations()) { > - final String name = > fields.get(fieldCollation.getFieldIndex()).getName(); > - keys.add(ElasticsearchRules.quote(name) + ": " + > direction(fieldCollation)); > - } > - } > - > - implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") > + "]"); > + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { > + final String name = > fields.get(fieldCollation.getFieldIndex()).getName(); > + implementor.addSort(name, fieldCollation.getDirection()); > } > > if (offset != null) { > - implementor.add("\"from\": " + ((RexLiteral) offset).getValue()); > + implementor.offset(((RexLiteral) offset).getValueAs(Long.class)); > } > > if (fetch != null) { > - implementor.add("\"size\": " + ((RexLiteral) fetch).getValue()); > + implementor.fetch(((RexLiteral) fetch).getValueAs(Long.class)); > } > } > > - private String direction(RelFieldCollation fieldCollation) { > - switch (fieldCollation.getDirection()) { > - case DESCENDING: > - case STRICTLY_DESCENDING: > - return "\"desc\""; > - case ASCENDING: > - case STRICTLY_ASCENDING: > - default: > - return "\"asc\""; > - } > - } > } > > // End ElasticsearchSort.java > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java > index 955636e..c404da7 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java > @@ -16,9 +16,24 @@ > */ > package org.apache.calcite.adapter.elasticsearch; > > +import org.apache.calcite.adapter.java.AbstractQueryableTable; > import org.apache.calcite.linq4j.Enumerable; > +import org.apache.calcite.linq4j.Enumerator; > import org.apache.calcite.linq4j.Linq4j; > +import org.apache.calcite.linq4j.QueryProvider; > +import org.apache.calcite.linq4j.Queryable; > import org.apache.calcite.linq4j.function.Function1; > +import org.apache.calcite.plan.RelOptCluster; > +import org.apache.calcite.plan.RelOptTable; > +import org.apache.calcite.rel.RelFieldCollation; > +import org.apache.calcite.rel.RelNode; > +import org.apache.calcite.rel.type.RelDataType; > +import org.apache.calcite.rel.type.RelDataTypeFactory; > +import org.apache.calcite.runtime.Hook; > +import org.apache.calcite.schema.SchemaPlus; > +import org.apache.calcite.schema.TranslatableTable; > +import org.apache.calcite.schema.impl.AbstractTableQueryable; > +import org.apache.calcite.sql.type.SqlTypeName; > import org.apache.calcite.util.Util; > > import org.apache.http.HttpEntity; > @@ -29,26 +44,47 @@ import org.apache.http.util.EntityUtils; > > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > +import com.fasterxml.jackson.databind.node.ArrayNode; > +import com.fasterxml.jackson.databind.node.ObjectNode; > > import org.elasticsearch.client.Response; > import org.elasticsearch.client.RestClient; > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > > import java.io.IOException; > import java.io.InputStream; > import java.io.UncheckedIOException; > +import java.util.ArrayList; > import java.util.Collections; > +import java.util.HashMap; > +import java.util.LinkedHashMap; > +import java.util.LinkedHashSet; > import java.util.List; > import java.util.Locale; > import java.util.Map; > import java.util.Objects; > +import java.util.Set; > +import java.util.function.Predicate; > +import java.util.stream.Collectors; > > /** > * Table based on an Elasticsearch type. > */ > -public class ElasticsearchTable extends AbstractElasticsearchTable { > +public class ElasticsearchTable extends AbstractQueryableTable implements > TranslatableTable { > + > + private static final Logger LOGGER = > LoggerFactory.getLogger(ElasticsearchTable.class); > + > + /** > + * Used for constructing (possibly nested) Elastic aggregation nodes. > + */ > + private static final String AGGREGATIONS = "aggregations"; > + > private final RestClient restClient; > private final ElasticsearchVersion version; > - > + private final String indexName; > + private final String typeName; > + final ObjectMapper mapper; > > /** > * Creates an ElasticsearchTable. > @@ -58,7 +94,7 @@ public class ElasticsearchTable extends > AbstractElasticsearchTable { > * @param typeName elastic searh index type > */ > ElasticsearchTable(RestClient client, ObjectMapper mapper, String > indexName, String typeName) { > - super(indexName, typeName, Objects.requireNonNull(mapper, "mapper")); > + super(Object[].class); > this.restClient = Objects.requireNonNull(client, "client"); > try { > this.version = detectVersion(client, mapper); > @@ -67,6 +103,9 @@ public class ElasticsearchTable extends > AbstractElasticsearchTable { > + "for %s/%s", indexName, typeName); > throw new UncheckedIOException(message, e); > } > + this.indexName = Objects.requireNonNull(indexName, "indexName"); > + this.typeName = Objects.requireNonNull(typeName, "typeName"); > + this.mapper = Objects.requireNonNull(mapper, "mapper"); > > } > > @@ -87,37 +126,211 @@ public class ElasticsearchTable extends > AbstractElasticsearchTable { > return > ElasticsearchVersion.fromString(node.get("version").get("number").asText()); > } > > - @Override protected String scriptedFieldPrefix() { > + /** > + * In ES 5.x scripted fields start with {@code params._source.foo} while > in ES2.x > + * {@code _source.foo}. Helper method to build correct query based on > runtime version of elastic. > + * Used to keep backwards compatibility with ES2. > + * > + * @see <a > href="https://github.com/elastic/elasticsearch/issues/20068">_source > variable</a> > + * @see <a > href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted > Fields</a> > + * @return string to be used for scripted fields > + */ > + String scriptedFieldPrefix() { > // ES2 vs ES5 scripted field difference > return version == ElasticsearchVersion.ES2 > ? ElasticsearchConstants.SOURCE_GROOVY > : ElasticsearchConstants.SOURCE_PAINLESS; > } > > - @Override protected Enumerable<Object> find(String index, List<String> ops, > - List<Map.Entry<String, Class>> fields) { > + /** > + * Executes a "find" operation on the underlying type. > + * > + * <p>For example, > + * <code>client.prepareSearch(index).setTypes(type) > + * .setSource("{\"fields\" : [\"state\"]}")</code></p> > + * > + * @param ops List of operations represented as Json strings. > + * @param fields List of fields to project; or null to return map > + * @param sort list of fields to sort and their direction (asc/desc) > + * @param aggregations aggregation functions > + * @return Enumerator of results > + */ > + protected Enumerable<Object> find(List<String> ops, > + List<Map.Entry<String, Class>> fields, > + List<Map.Entry<String, RelFieldCollation.Direction>> sort, > + List<String> groupBy, > + List<Map.Entry<String, String>> aggregations, > + Long offset, Long fetch) throws IOException { > + > + if (!aggregations.isEmpty()) { > + // process aggregations separately > + return aggregate(ops, fields, sort, groupBy, aggregations, offset, > fetch); > + } > + > + final ObjectNode query = mapper.createObjectNode(); > > - final String query; > - if (!ops.isEmpty()) { > - query = "{" + Util.toString(ops, "", ", ", "") + "}"; > - } else { > - query = "{}"; > + // manually parse from previously concatenated string > + query.setAll( > + (ObjectNode) mapper.readTree("{" > + + Util.toString(ops, "", ", ", "") + "}")); > + > + if (!sort.isEmpty()) { > + ArrayNode sortNode = query.withArray("sort"); > + sort.forEach(e -> > + sortNode.add( > + mapper.createObjectNode().put(e.getKey(), > e.getValue().isDescending() ? "desc" : "asc")) > + ); > + } > + > + if (offset != null) { > + query.put("from", offset); > + } > + > + if (fetch != null) { > + query.put("size", fetch); > } > > try { > - ElasticsearchSearchResult result = httpRequest(query); > - final Function1<ElasticsearchSearchResult.SearchHit, Object> getter = > + ElasticsearchJson.Result search = httpRequest(query); > + final Function1<ElasticsearchJson.SearchHit, Object> getter = > ElasticsearchEnumerators.getter(fields); > - return Linq4j.asEnumerable(result.searchHits().hits()).select(getter); > + return Linq4j.asEnumerable(search.searchHits().hits()).select(getter); > } catch (IOException e) { > throw new UncheckedIOException(e); > } > } > > - private ElasticsearchSearchResult httpRequest(String query) throws > IOException { > + private Enumerable<Object> aggregate(List<String> ops, > + List<Map.Entry<String, Class>> fields, > + List<Map.Entry<String, RelFieldCollation.Direction>> sort, > + List<String> groupBy, > + List<Map.Entry<String, String>> aggregations, > + Long offset, Long fetch) throws IOException { > + > + if (aggregations.isEmpty()) { > + throw new IllegalArgumentException("Missing Aggregations"); > + } > + > + if (!groupBy.isEmpty() && offset != null) { > + String message = "Currently ES doesn't support generic pagination " > + + "with aggregations. You can still use LIMIT keyword (without > OFFSET). " > + + "For more details see > https://github.com/elastic/elasticsearch/issues/4915"; > + throw new IllegalStateException(message); > + } > + > + final ObjectNode query = mapper.createObjectNode(); > + > + // manually parse into JSON from previously concatenated strings > + query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", > ", ", "") + "}")); > + > + // remove / override attributes which are not applicable to aggregations > + query.put("_source", false); > + query.put("size", 0); > + query.remove("script_fields"); > + > + // allows to detect aggregation for count(*) > + final Predicate<Map.Entry<String, String>> isCountStar = e -> > e.getValue() > + .contains("\"" + ElasticsearchConstants.ID + "\""); > + > + // list of expressions which are count(*) > + final Set<String> countAll = aggregations.stream() > + .filter(isCountStar) > + .map(Map.Entry::getKey).collect(Collectors.toSet()); > + > + final Map<String, String> fieldMap = new HashMap<>(); > + > + // due to ES aggregation format. fields in "order by" clause should go > first > + // if "order by" is missing. order in "group by" is un-important > + final Set<String> orderedGroupBy = new LinkedHashSet<>(); > + > orderedGroupBy.addAll(sort.stream().map(Map.Entry::getKey).collect(Collectors.toList())); > + orderedGroupBy.addAll(groupBy); > + > + // construct nested aggregations node(s) > + ObjectNode parent = query.with(AGGREGATIONS); > + for (String name: orderedGroupBy) { > + final String aggName = "g_" + name; > + fieldMap.put(aggName, name); > + > + final ObjectNode section = parent.with(aggName); > + final ObjectNode terms = section.with("terms"); > + terms.put("field", name); > + terms.set("missing", ElasticsearchJson.MISSING_VALUE); // expose > missing terms > + > + if (fetch != null) { > + terms.put("size", fetch); > + } > + > + sort.stream().filter(e -> > e.getKey().equals(name)).findAny().ifPresent(s -> { > + terms.with("order").put("_key", s.getValue().isDescending() ? "desc" > : "asc"); > + }); > + > + parent = section.with(AGGREGATIONS); > + } > + > + // simple version for queries like "select count(*), max(col1) from > table" (no GROUP BY cols) > + if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) { > + for (Map.Entry<String, String> aggregation : aggregations) { > + JsonNode value = mapper.readTree("{" + aggregation.getValue() + > "}"); > + parent.set(aggregation.getKey(), value); > + } > + } > + > + // cleanup query. remove empty AGGREGATIONS element (if empty) > + JsonNode agg = query; > + while (agg.has(AGGREGATIONS) && > agg.get(AGGREGATIONS).elements().hasNext()) { > + agg = agg.get(AGGREGATIONS); > + } > + ((ObjectNode) agg).remove(AGGREGATIONS); > + > + ElasticsearchJson.Result res = httpRequest(query); > + > + final List<Map<String, Object>> result = new ArrayList<>(); > + if (res.aggregations() != null) { > + // collect values > + ElasticsearchJson.visitValueNodes(res.aggregations(), m -> { > + Map<String, Object> newMap = new LinkedHashMap<>(); > + for (String key: m.keySet()) { > + newMap.put(fieldMap.getOrDefault(key, key), m.get(key)); > + } > + result.add(newMap); > + }); > + } else { > + // probably no group by. add single result > + result.add(new LinkedHashMap<>()); > + } > + > + // elastic exposes total number of documents matching a query in > "/hits/total" path > + // this can be used for simple "select count(*) from table" > + final long total = res.searchHits().total(); > + > + if (groupBy.isEmpty()) { > + // put totals automatically for count(*) expression(s), unless they > contain group by > + for (String expr : countAll) { > + result.forEach(m -> m.put(expr, total)); > + } > + } > + > + final Function1<ElasticsearchJson.SearchHit, Object> getter = > + ElasticsearchEnumerators.getter(fields); > + > + ElasticsearchJson.SearchHits hits = > + new ElasticsearchJson.SearchHits(total, result.stream() > + .map(r -> new ElasticsearchJson.SearchHit("_id", r, null)) > + .collect(Collectors.toList())); > + > + return Linq4j.asEnumerable(hits.hits()).select(getter); > + } > + > + private ElasticsearchJson.Result httpRequest(ObjectNode query) throws > IOException { > Objects.requireNonNull(query, "query"); > String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, > typeName); > - HttpEntity entity = new StringEntity(query, > ContentType.APPLICATION_JSON); > + > + Hook.QUERY_PLAN.run(query); > + final String json = mapper.writeValueAsString(query); > + > + LOGGER.debug("Elasticsearch Query: {}", json); > + > + HttpEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON); > Response response = restClient.performRequest("POST", uri, > Collections.emptyMap(), entity); > if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { > final String error = EntityUtils.toString(response.getEntity()); > @@ -128,9 +341,75 @@ public class ElasticsearchTable extends > AbstractElasticsearchTable { > } > > try (InputStream is = response.getEntity().getContent()) { > - return mapper.readValue(is, ElasticsearchSearchResult.class); > + return mapper.readValue(is, ElasticsearchJson.Result.class); > } > } > + > + @Override public RelDataType getRowType(RelDataTypeFactory > relDataTypeFactory) { > + final RelDataType mapType = relDataTypeFactory.createMapType( > + relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR), > + relDataTypeFactory.createTypeWithNullability( > + relDataTypeFactory.createSqlType(SqlTypeName.ANY), > + true)); > + return relDataTypeFactory.builder().add("_MAP", mapType).build(); > + } > + > + @Override public String toString() { > + return "ElasticsearchTable{" + indexName + "/" + typeName + "}"; > + } > + > + @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, > SchemaPlus schema, > + String tableName) { > + return new ElasticsearchQueryable<>(queryProvider, schema, this, > tableName); > + } > + > + @Override public RelNode toRel(RelOptTable.ToRelContext context, > RelOptTable relOptTable) { > + final RelOptCluster cluster = context.getCluster(); > + return new ElasticsearchTableScan(cluster, > cluster.traitSetOf(ElasticsearchRel.CONVENTION), > + relOptTable, this, null); > + } > + > + /** > + * Implementation of {@link Queryable} based on > + * a {@link ElasticsearchTable}. > + * > + * @param <T> element type > + */ > + public static class ElasticsearchQueryable<T> extends > AbstractTableQueryable<T> { > + ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema, > + ElasticsearchTable table, String tableName) { > + super(queryProvider, schema, table, tableName); > + } > + > + public Enumerator<T> enumerator() { > + return null; > + } > + > + private ElasticsearchTable getTable() { > + return (ElasticsearchTable) table; > + } > + > + /** Called via code-generation. > + * @param ops list of queries (as strings) > + * @param fields projection > + * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND > + * @return result as enumerable > + */ > + @SuppressWarnings("UnusedDeclaration") > + public Enumerable<Object> find(List<String> ops, > + List<Map.Entry<String, Class>> fields, > + List<Map.Entry<String, RelFieldCollation.Direction>> sort, > + List<String> groupBy, > + List<Map.Entry<String, String>> aggregations, > + Long offset, Long fetch) { > + try { > + return getTable().find(ops, fields, sort, groupBy, aggregations, > offset, fetch); > + } catch (IOException e) { > + throw new UncheckedIOException("Failed to query " + > getTable().indexName, e); > + } > + } > + > + } > } > > // End ElasticsearchTable.java > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java > index 7795ad3..3dd041a 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java > @@ -37,7 +37,7 @@ import java.util.Objects; > * using the "find" method.</p> > */ > public class ElasticsearchTableScan extends TableScan implements > ElasticsearchRel { > - private final AbstractElasticsearchTable elasticsearchTable; > + private final ElasticsearchTable elasticsearchTable; > private final RelDataType projectRowType; > > /** > @@ -50,10 +50,10 @@ public class ElasticsearchTableScan extends TableScan > implements ElasticsearchRe > * @param projectRowType Fields and types to project; null to project raw > row > */ > ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet, > - RelOptTable table, AbstractElasticsearchTable elasticsearchTable, > + RelOptTable table, ElasticsearchTable elasticsearchTable, > RelDataType projectRowType) { > super(cluster, traitSet, table); > - this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable); > + this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable, > "elasticsearchTable"); > this.projectRowType = projectRowType; > > assert getConvention() == ElasticsearchRel.CONVENTION; > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java > index 51a2bd5..5e788a8 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java > @@ -30,12 +30,10 @@ import org.apache.calcite.plan.RelOptCluster; > import org.apache.calcite.plan.RelOptCost; > import org.apache.calcite.plan.RelOptPlanner; > import org.apache.calcite.plan.RelTraitSet; > -import org.apache.calcite.prepare.CalcitePrepareImpl; > import org.apache.calcite.rel.RelNode; > import org.apache.calcite.rel.convert.ConverterImpl; > import org.apache.calcite.rel.metadata.RelMetadataQuery; > import org.apache.calcite.rel.type.RelDataType; > -import org.apache.calcite.runtime.Hook; > import org.apache.calcite.util.BuiltInMethod; > import org.apache.calcite.util.Pair; > > @@ -60,15 +58,15 @@ public class ElasticsearchToEnumerableConverter extends > ConverterImpl implements > return super.computeSelfCost(planner, mq).multiplyBy(.1); > } > > - @Override public Result implement(EnumerableRelImplementor implementor, > Prefer prefer) { > - final BlockBuilder list = new BlockBuilder(); > - final ElasticsearchRel.Implementor elasticsearchImplementor = > - new ElasticsearchRel.Implementor(); > - elasticsearchImplementor.visitChild(0, getInput()); > + @Override public Result implement(EnumerableRelImplementor relImplementor, > Prefer prefer) { > + final BlockBuilder block = new BlockBuilder(); > + final ElasticsearchRel.Implementor implementor = new > ElasticsearchRel.Implementor(); > + implementor.visitChild(0, getInput()); > + > final RelDataType rowType = getRowType(); > - final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), > rowType, > + final PhysType physType = > PhysTypeImpl.of(relImplementor.getTypeFactory(), rowType, > prefer.prefer(JavaRowFormat.ARRAY)); > - final Expression fields = list.append("fields", > + final Expression fields = block.append("fields", > constantArrayList( > Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType), > new AbstractList<Class>() { > @@ -81,20 +79,24 @@ public class ElasticsearchToEnumerableConverter extends > ConverterImpl implements > } > }), > Pair.class)); > - final Expression table = list.append("table", > - elasticsearchImplementor.table > - > .getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class)); > - List<String> opList = elasticsearchImplementor.list; > - final Expression ops = list.append("ops", constantArrayList(opList, > String.class)); > - Expression enumerable = list.append("enumerable", > + final Expression table = block.append("table", > + implementor.table > + .getExpression(ElasticsearchTable.ElasticsearchQueryable.class)); > + List<String> opList = implementor.list; > + final Expression ops = block.append("ops", constantArrayList(opList, > String.class)); > + final Expression sort = block.append("sort", > constantArrayList(implementor.sort, Pair.class)); > + final Expression groupBy = block.append("groupBy", > Expressions.constant(implementor.groupBy)); > + final Expression aggregations = block.append("aggregations", > + constantArrayList(implementor.aggregations, Pair.class)); > + > + final Expression offset = block.append("offset", > Expressions.constant(implementor.offset)); > + final Expression fetch = block.append("fetch", > Expressions.constant(implementor.fetch)); > + > + Expression enumerable = block.append("enumerable", > Expressions.call(table, > ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops, > - fields)); > - if (CalcitePrepareImpl.DEBUG) { > - System.out.println("Elasticsearch: " + opList); > - } > - Hook.QUERY_PLAN.run(opList); > - list.add(Expressions.return_(null, enumerable)); > - return implementor.result(physType, list.toBlock()); > + fields, sort, groupBy, aggregations, offset, fetch)); > + block.add(Expressions.return_(null, enumerable)); > + return relImplementor.result(physType, block.toBlock()); > } > > /** E.g. {@code constantArrayList("x", "y")} returns > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java > ---------------------------------------------------------------------- > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java > > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java > index 97f7943..a866fe4 100644 > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java > @@ -78,6 +78,16 @@ class PredicateAnalyzer { > } > } > > + /** > + * Thrown when {@link org.apache.calcite.rel.RelNode} expression can't be > processed > + * (or converted into ES query) > + */ > + static class ExpressionNotAnalyzableException extends Exception { > + ExpressionNotAnalyzableException(String message, Throwable cause) { > + super(message, cause); > + } > + } > + > private PredicateAnalyzer() {} > > /** >
