This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 234c2fc  [CALCITE-2760] ElasticSearch adapter. Support sorting on 
unmapped fields (eg. _MAP['city'])
234c2fc is described below

commit 234c2fc4039781afe774be5466f0bc5586f09e08
Author: Andrei Sereda <[email protected]>
AuthorDate: Wed Jan 2 17:20:02 2019 -0500

    [CALCITE-2760] ElasticSearch adapter. Support sorting on unmapped fields 
(eg. _MAP['city'])
    
    Previously queries like below failed due to missing mapping between 
projections and items (as in `_MAP['foo']`):
    
    ```sql
    select * from elastic order by _MAP['city']
    select _MAP['city'] from elastic order by _MAP['state']
    ```
    
    Reuse mapping from `ElasticsearchRel.Implementor.expressionItemMap` in 
`ElasticsearchSort` if
    current elastic field name is unknown (like `EXPR$0`).
    
    Closes #985
---
 .../adapter/elasticsearch/ElasticsearchSort.java   |  19 +++-
 .../elasticsearch/ElasticSearchAdapterTest.java    | 100 ++++++++++++++++-----
 2 files changed, 96 insertions(+), 23 deletions(-)

diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
index 9078b72..7adab31 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -30,6 +30,9 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Sort}
@@ -58,7 +61,21 @@ public class ElasticsearchSort extends Sort implements 
ElasticsearchRel {
 
     for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
       final String name = fields.get(fieldCollation.getFieldIndex()).getName();
-      implementor.addSort(name, fieldCollation.getDirection());
+      // TODO there should be a better way to extract original ITEM
+      if (name.toUpperCase(Locale.ROOT).startsWith("EXPR$")) {
+        Optional<String> item = implementor.expressionItemMap.stream()
+            .filter(m -> name.equals(m.getKey()))
+            .map(Map.Entry::getValue).findAny();
+
+        if (!item.isPresent()) {
+          final String message = String.format(Locale.ROOT, "No mapping found 
for %s", name);
+          throw new IllegalStateException(message);
+        }
+
+        item.ifPresent(m -> implementor.addSort(m, 
fieldCollation.getDirection()));
+      } else {
+        implementor.addSort(name, fieldCollation.getDirection());
+      }
     }
 
     if (offset != null) {
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 8b09b88..f5a1265 100644
--- 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.adapter.elasticsearch;
 
 import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.impl.ViewTable;
 import org.apache.calcite.schema.impl.ViewTableMacro;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Consumer;
 
 /**
@@ -195,31 +197,10 @@ public class ElasticSearchAdapterTest {
         + "    ElasticsearchProject(city=[CAST(ITEM($0, 'city')):VARCHAR(20) 
CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"], 
longitude=[CAST(ITEM(ITEM($0, 'loc'), 0)):FLOAT], latitude=[CAST(ITEM(ITEM($0, 
'loc'), 1)):FLOAT], pop=[CAST(ITEM($0, 'pop')):INTEGER], state=[CAST(ITEM($0, 
'state')):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\"], id=[CAST(ITEM($0, 'id')):VARCHAR(5) CHARACTER 
SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"])\n"
         + "      ElasticsearchTableScan(table=[[elastic, zips]])";
 
-    Consumer<ResultSet> checker = rset -> {
-      try {
-        final List<String> states = new ArrayList<>();
-        while (rset.next()) {
-          states.add(rset.getString("state"));
-        }
-        for (int i = 0; i < states.size() - 1; i++) {
-          String current = states.get(i);
-          String next = states.get(i + 1);
-          if (current.compareTo(next) > 0) {
-            final String message = String.format(Locale.ROOT,
-                "Not sorted: %s (index:%d) > %s (index:%d) count: %d",
-                current, i, next, i + 1, states.size());
-            throw new AssertionError(message);
-          }
-        }
-      } catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-    };
-
     calciteAssert()
         .query("select * from zips order by state")
         .returnsCount(ZIPS_SIZE)
-        .returns(checker)
+        .returns(sortedResultSetChecker("state", 
RelFieldCollation.Direction.ASCENDING))
         .explainContains(explain);
   }
 
@@ -240,6 +221,81 @@ public class ElasticSearchAdapterTest {
   }
 
   /**
+   * Throws {@code AssertionError} if result set is not sorted by {@code 
column}.
+   * {@code null}s are ignored.
+   *
+   * @param column column to be extracted (as comparable object).
+   * @param direction ascending / descending
+   * @return consumer which throws exception
+   */
+  private static Consumer<ResultSet> sortedResultSetChecker(String column,
+      RelFieldCollation.Direction direction) {
+    Objects.requireNonNull(column, "column");
+    return rset -> {
+      try {
+        final List<Comparable<?>> states = new ArrayList<>();
+        while (rset.next()) {
+          Object object = rset.getObject(column);
+          if (object != null && !(object instanceof Comparable)) {
+            final String message = String.format(Locale.ROOT, "%s is not 
comparable", object);
+            throw new IllegalStateException(message);
+          }
+          if (object != null) {
+            states.add((Comparable) object);
+          }
+        }
+        for (int i = 0; i < states.size() - 1; i++) {
+          final Comparable current = states.get(i);
+          final Comparable next = states.get(i + 1);
+          final int cmp = current.compareTo(next);
+          if (direction == RelFieldCollation.Direction.ASCENDING ? cmp > 0 : 
cmp < 0) {
+            final String message = String.format(Locale.ROOT,
+                "Column %s NOT sorted (%s): %s (index:%d) > %s (index:%d) 
count: %d",
+                column,
+                direction,
+                current, i, next, i + 1, states.size());
+            throw new AssertionError(message);
+          }
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  /**
+   * Sorting directly on items without a view.
+   *
+   * Queries of type: {@code select _MAP['a'] from elastic order by _MAP['b']}
+   */
+  @Test public void testSortNoSchema() {
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select * from elastic.zips order by _MAP['city']")
+        .returnsCount(ZIPS_SIZE);
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select _MAP['state'] from elastic.zips order by _MAP['city']")
+        .returnsCount(ZIPS_SIZE);
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select _MAP['city'] as city, _MAP['state'] from elastic.zips "
+            + "order by _MAP['city'] asc")
+        .returns(sortedResultSetChecker("city", 
RelFieldCollation.Direction.ASCENDING))
+        .returnsCount(ZIPS_SIZE);
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select _MAP['city'] as city, _MAP['state'] from elastic.zips "
+            + "order by _MAP['city'] desc")
+        .returns(sortedResultSetChecker("city", 
RelFieldCollation.Direction.DESCENDING))
+        .returnsCount(ZIPS_SIZE);
+
+  }
+
+  /**
    * Sort by multiple fields (in different direction: asc/desc)
    */
   @Test public void sortAscDesc() {

Reply via email to