This is an automated email from the ASF dual-hosted git repository.

mbudiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/main by this push:
     new 32426043fd [CALCITE-6274] Two Elasticsearch index join return empty in 
any case
32426043fd is described below

commit 32426043fd4cae7db6faf00d9b6bbc89c2c7bb30
Author: Wang Zhao <[email protected]>
AuthorDate: Wed Feb 21 00:14:58 2024 +0800

    [CALCITE-6274] Two Elasticsearch index join return empty in any case
---
 .../adapter/elasticsearch/ElasticsearchRules.java  |   3 +-
 .../calcite/adapter/elasticsearch/JoinTest.java    | 120 +++++++++++++++++++++
 2 files changed, 121 insertions(+), 2 deletions(-)

diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
index 22da4febe8..e4968c9e67 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -115,8 +115,7 @@ static List<String> elasticsearchFieldNames(final 
RelDataType rowType) {
     return SqlValidatorUtil.uniquify(
         new AbstractList<String>() {
           @Override public String get(int index) {
-            final String name = rowType.getFieldList().get(index).getName();
-            return name.startsWith("$") ? "_" + name.substring(2) : name;
+            return rowType.getFieldList().get(index).getName();
           }
 
           @Override public int size() {
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/JoinTest.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/JoinTest.java
new file mode 100644
index 0000000000..92db202241
--- /dev/null
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/JoinTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.test.CalciteAssert;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.ResourceAccessMode;
+import org.junit.jupiter.api.parallel.ResourceLock;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Testing Elasticsearch join query.
+ */
+@ResourceLock(value = "elasticsearch-scrolls", mode = ResourceAccessMode.READ)
+public class JoinTest {
+
+  public static final EmbeddedElasticsearchPolicy NODE = 
EmbeddedElasticsearchPolicy.create();
+
+  private static final String NAME_LEFT = "lt";
+
+  private static final String NAME_RIGHT = "rt";
+
+
+  @BeforeAll
+  public static void setupInstance() throws Exception {
+    final Map<String, String> ltMappings = ImmutableMap.<String, 
String>builder()
+        .put("doc_id", "keyword")
+        .put("val1", "long")
+        .build();
+
+    final Map<String, String> rtMappings = ImmutableMap.<String, 
String>builder()
+        .put("doc_id", "keyword")
+        .put("val2", "long")
+        .build();
+
+    NODE.createIndex(NAME_LEFT, ltMappings);
+    NODE.createIndex(NAME_RIGHT, rtMappings);
+    final ObjectMapper mapper = new ObjectMapper()
+        .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES) // 
user-friendly settings to
+        .enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES); // avoid too much 
quoting
+
+    String ldoc1 = "{doc_id:'1', val1:1}";
+    String ldoc2 = "{doc_id:'2', val1:2}";
+    final List<ObjectNode> docs = new ArrayList<>();
+    for (String text : Arrays.asList(ldoc1, ldoc2)) {
+      docs.add((ObjectNode) mapper.readTree(text));
+    }
+    NODE.insertBulk(NAME_LEFT, docs);
+
+
+    String rdoc1 = "{doc_id:'1', val2:1}";
+    String rdoc2 = "{doc_id:'2', val2:2}";
+    final List<ObjectNode> rdocs = new ArrayList<>();
+    for (String text : Arrays.asList(rdoc1, rdoc2)) {
+      rdocs.add((ObjectNode) mapper.readTree(text));
+    }
+    NODE.insertBulk(NAME_RIGHT, rdocs);
+  }
+
+  private static Connection createConnection() throws SQLException {
+    final Connection connection = DriverManager.getConnection("jdbc:calcite:");
+    final SchemaPlus root =
+        connection.unwrap(CalciteConnection.class).getRootSchema();
+
+    root.add("elastic0", new ElasticsearchSchema(NODE.restClient(), 
NODE.mapper(), NAME_LEFT));
+    root.add("elastic1", new ElasticsearchSchema(NODE.restClient(), 
NODE.mapper(), NAME_RIGHT));
+
+    return connection;
+  }
+
+  /**
+   * Test two elasticserch index join.
+   */
+  @Test void join() {
+    CalciteAssert.that()
+        .with(JoinTest::createConnection)
+        .query(
+            String.format(Locale.ROOT,
+                "select t._MAP['doc_id'] AS \"doc_id\", t._MAP['val1'] AS 
\"val1\" "
+            + " from \"elastic0\".\"%s\" t "
+            + " join \"elastic1\".\"%s\" s"
+            + " on cast(t._MAP['doc_id'] as varchar) = cast(s._MAP['doc_id'] 
as varchar)",
+                NAME_LEFT, NAME_RIGHT))
+        .returnsUnordered("doc_id=1; val1=1",
+            "doc_id=2; val1=2");
+
+  }
+
+}

Reply via email to