This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 234c2fc [CALCITE-2760] ElasticSearch adapter. Support sorting on
unmapped fields (eg. _MAP['city'])
234c2fc is described below
commit 234c2fc4039781afe774be5466f0bc5586f09e08
Author: Andrei Sereda <[email protected]>
AuthorDate: Wed Jan 2 17:20:02 2019 -0500
[CALCITE-2760] ElasticSearch adapter. Support sorting on unmapped fields
(eg. _MAP['city'])
Previously queries like below failed due to missing mapping between
projections and items (as in `_MAP['foo']`):
```sql
select * from elastic order by _MAP['city']
select _MAP['city'] from elastic order by _MAP['state']
```
Reuse mapping from `ElasticsearchRel.Implementor.expressionItemMap` in
`ElasticsearchSort` if
current elastic field name is unknown (like `EXPR$0`).
Closes #985
---
.../adapter/elasticsearch/ElasticsearchSort.java | 19 +++-
.../elasticsearch/ElasticSearchAdapterTest.java | 100 ++++++++++++++++-----
2 files changed, 96 insertions(+), 23 deletions(-)
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
index 9078b72..7adab31 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -30,6 +30,9 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
/**
* Implementation of {@link org.apache.calcite.rel.core.Sort}
@@ -58,7 +61,21 @@ public class ElasticsearchSort extends Sort implements
ElasticsearchRel {
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
final String name = fields.get(fieldCollation.getFieldIndex()).getName();
- implementor.addSort(name, fieldCollation.getDirection());
+ // TODO there should be a better way to extract original ITEM
+ if (name.toUpperCase(Locale.ROOT).startsWith("EXPR$")) {
+ Optional<String> item = implementor.expressionItemMap.stream()
+ .filter(m -> name.equals(m.getKey()))
+ .map(Map.Entry::getValue).findAny();
+
+ if (!item.isPresent()) {
+ final String message = String.format(Locale.ROOT, "No mapping found
for %s", name);
+ throw new IllegalStateException(message);
+ }
+
+ item.ifPresent(m -> implementor.addSort(m,
fieldCollation.getDirection()));
+ } else {
+ implementor.addSort(name, fieldCollation.getDirection());
+ }
}
if (offset != null) {
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 8b09b88..f5a1265 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -17,6 +17,7 @@
package org.apache.calcite.adapter.elasticsearch;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Consumer;
/**
@@ -195,31 +197,10 @@ public class ElasticSearchAdapterTest {
+ " ElasticsearchProject(city=[CAST(ITEM($0, 'city')):VARCHAR(20)
CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"],
longitude=[CAST(ITEM(ITEM($0, 'loc'), 0)):FLOAT], latitude=[CAST(ITEM(ITEM($0,
'loc'), 1)):FLOAT], pop=[CAST(ITEM($0, 'pop')):INTEGER], state=[CAST(ITEM($0,
'state')):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE
\"ISO-8859-1$en_US$primary\"], id=[CAST(ITEM($0, 'id')):VARCHAR(5) CHARACTER
SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"])\n"
+ " ElasticsearchTableScan(table=[[elastic, zips]])";
- Consumer<ResultSet> checker = rset -> {
- try {
- final List<String> states = new ArrayList<>();
- while (rset.next()) {
- states.add(rset.getString("state"));
- }
- for (int i = 0; i < states.size() - 1; i++) {
- String current = states.get(i);
- String next = states.get(i + 1);
- if (current.compareTo(next) > 0) {
- final String message = String.format(Locale.ROOT,
- "Not sorted: %s (index:%d) > %s (index:%d) count: %d",
- current, i, next, i + 1, states.size());
- throw new AssertionError(message);
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- };
-
calciteAssert()
.query("select * from zips order by state")
.returnsCount(ZIPS_SIZE)
- .returns(checker)
+ .returns(sortedResultSetChecker("state",
RelFieldCollation.Direction.ASCENDING))
.explainContains(explain);
}
@@ -240,6 +221,81 @@ public class ElasticSearchAdapterTest {
}
/**
+ * Throws {@code AssertionError} if result set is not sorted by {@code
column}.
+ * {@code null}s are ignored.
+ *
+ * @param column column to be extracted (as comparable object).
+ * @param direction ascending / descending
+ * @return consumer which throws exception
+ */
+ private static Consumer<ResultSet> sortedResultSetChecker(String column,
+ RelFieldCollation.Direction direction) {
+ Objects.requireNonNull(column, "column");
+ return rset -> {
+ try {
+ final List<Comparable<?>> states = new ArrayList<>();
+ while (rset.next()) {
+ Object object = rset.getObject(column);
+ if (object != null && !(object instanceof Comparable)) {
+ final String message = String.format(Locale.ROOT, "%s is not
comparable", object);
+ throw new IllegalStateException(message);
+ }
+ if (object != null) {
+ states.add((Comparable) object);
+ }
+ }
+ for (int i = 0; i < states.size() - 1; i++) {
+ final Comparable current = states.get(i);
+ final Comparable next = states.get(i + 1);
+ final int cmp = current.compareTo(next);
+ if (direction == RelFieldCollation.Direction.ASCENDING ? cmp > 0 :
cmp < 0) {
+ final String message = String.format(Locale.ROOT,
+ "Column %s NOT sorted (%s): %s (index:%d) > %s (index:%d)
count: %d",
+ column,
+ direction,
+ current, i, next, i + 1, states.size());
+ throw new AssertionError(message);
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ /**
+ * Sorting directly on items without a view.
+ *
+ * Queries of type: {@code select _MAP['a'] from elastic order by _MAP['b']}
+ */
+ @Test public void testSortNoSchema() {
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select * from elastic.zips order by _MAP['city']")
+ .returnsCount(ZIPS_SIZE);
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select _MAP['state'] from elastic.zips order by _MAP['city']")
+ .returnsCount(ZIPS_SIZE);
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select _MAP['city'] as city, _MAP['state'] from elastic.zips "
+ + "order by _MAP['city'] asc")
+ .returns(sortedResultSetChecker("city",
RelFieldCollation.Direction.ASCENDING))
+ .returnsCount(ZIPS_SIZE);
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select _MAP['city'] as city, _MAP['state'] from elastic.zips "
+ + "order by _MAP['city'] desc")
+ .returns(sortedResultSetChecker("city",
RelFieldCollation.Direction.DESCENDING))
+ .returnsCount(ZIPS_SIZE);
+
+ }
+
+ /**
* Sort by multiple fields (in different direction: asc/desc)
*/
@Test public void sortAscDesc() {