http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 82dc184..a45d55e 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,11 +17,19 @@
  */
 package org.apache.storm.sql;
 
+import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.LocalCluster;
 import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.DataSourcesRegistry;
 import org.apache.storm.sql.runtime.FieldInfo;
@@ -29,464 +37,270 @@ import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.tuple.Values;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 public class TestStormSql {
-  private static class MockDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mock";
+
+    public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
+    public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
+    public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
+
+    private static class MockDataSourceProvider implements DataSourcesProvider 
{
+        @Override
+        public String scheme() {
+            return "mock";
+        }
+
+        @Override
+        public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
+                                                      Properties properties, 
List<FieldInfo> fields) {
+            return new TestUtils.MockSqlExprDataSource();
+        }
+    }
+
+    private static class MockNestedDataSourceProvider implements 
DataSourcesProvider {
+        @Override
+        public String scheme() {
+            return "mocknested";
+        }
+
+        @Override
+        public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
+                                                      Properties properties, 
List<FieldInfo> fields) {
+            return new TestUtils.MockSqlTridentNestedDataSource();
+        }
     }
 
-    @Override
-    public DataSource construct(
-        URI uri, String inputFormatClass, String outputFormatClass,
-        List<FieldInfo> fields) {
-      return new TestUtils.MockDataSource();
+    private static class MockGroupDataSourceProvider implements 
DataSourcesProvider {
+        @Override
+        public String scheme() {
+            return "mockgroup";
+        }
+
+        @Override
+        public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
+                                                      Properties properties, 
List<FieldInfo> fields) {
+            return new TestUtils.MockSqlTridentGroupedDataSource();
+        }
     }
 
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
-                                                  Properties properties, 
List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentDataSource();
+    private static class MockEmpDataSourceProvider implements 
DataSourcesProvider {
+        @Override
+        public String scheme() {
+            return "mockemp";
+        }
+
+        @Override
+        public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
+                                                      Properties properties, 
List<FieldInfo> fields) {
+            return new TestUtils.MockSqlTridentJoinDataSourceEmp();
+        }
     }
-  }
 
-  private static class MockNestedDataSourceProvider implements 
DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mocknested";
+    private static class MockDeptDataSourceProvider implements 
DataSourcesProvider {
+        @Override
+        public String scheme() {
+            return "mockdept";
+        }
+
+        @Override
+        public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
+                                                      Properties properties, 
List<FieldInfo> fields) {
+            return new TestUtils.MockSqlTridentJoinDataSourceDept();
+        }
     }
 
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockNestedDataSource();
+    private static LocalCluster cluster;
+
+    @BeforeClass
+    public static void staticSetup() throws Exception {
+        DataSourcesRegistry.providerMap().put("mock", new 
MockDataSourceProvider());
+        DataSourcesRegistry.providerMap().put("mocknested", new 
MockNestedDataSourceProvider());
+        DataSourcesRegistry.providerMap().put("mockgroup", new 
MockGroupDataSourceProvider());
+        DataSourcesRegistry.providerMap().put("mockemp", new 
MockEmpDataSourceProvider());
+        DataSourcesRegistry.providerMap().put("mockdept", new 
MockDeptDataSourceProvider());
+
+        cluster = new LocalCluster();
     }
 
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
-                                                  Properties properties, 
List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentDataSource();
+    @AfterClass
+    public static void staticCleanup() {
+        DataSourcesRegistry.providerMap().remove("mock");
+        DataSourcesRegistry.providerMap().remove("mocknested");
+        DataSourcesRegistry.providerMap().remove("mockgroup");
+        DataSourcesRegistry.providerMap().remove("mockemp");
+        DataSourcesRegistry.providerMap().remove("mockdept");
+
+        if (cluster != null) {
+            cluster.shutdown();
+            cluster = null;
+        }
     }
-  }
 
-  private static class MockGroupDataSourceProvider implements 
DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockgroup";
+    @Before
+    public void setUp() {
+        getCollectedValues().clear();
     }
 
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockGroupDataSource();
+    @Test
+    public void testExternalDataSource() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, 
WAIT_TIMEOUT_MS);
+
+        Assert.assertEquals(2, values.size());
+        Assert.assertEquals(4, values.get(0).get(0));
+        Assert.assertEquals(5, values.get(1).get(0));
     }
 
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
-                                                  Properties properties, 
List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentGroupedDataSource();
+    @Test
+    public void testExternalDataSourceNested() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD['c'], 
NESTEDMAPFIELD, ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND 
CAST(ARRAYFIELD[2] AS INTEGER) = 200");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> values.size() >= 1, 
WAIT_TIMEOUT_MS);
+
+        Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+        Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", 
map);
+        Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 
200, 300)), values.get(0));
     }
-  }
 
-  private static class MockEmpDataSourceProvider implements 
DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockemp";
+    @Test
+    public void testExternalNestedNonExistKeyAccess() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        // this triggers java.lang.RuntimeException: Cannot convert null to int
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, 
ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
+
+        Assert.assertEquals(0, values.size());
+    }
+
+    @Test
+    public void testExternalNestedNonExistKeyAccess2() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        // this triggers java.lang.RuntimeException: Cannot convert null to int
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, 
ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
+
+        Assert.assertEquals(0, values.size());
+    }
+
+    @Test
+    public void testExternalNestedInvalidAccessStringIndexOnArray() throws 
Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, 
ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
+
+        Assert.assertEquals(0, values.size());
     }
 
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockEmpDataSource();
+    @Test
+    public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, 
NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, 
ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
+
+        Assert.assertEquals(0, values.size());
     }
 
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
-                                                  Properties properties, 
List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentJoinDataSourceEmp();
+    @Test(expected = ValidationException.class)
+    public void testExternalUdfType() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+        stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE 
ID = 0");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_ERROR_EXPECTED);
+
+        Assert.fail("Should raise ValidationException.");
     }
-  }
 
-  private static class MockDeptDataSourceProvider implements 
DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockdept";
+    @Test(expected = CompilingClassLoader.CompilerException.class)
+    public void testExternalUdfType2() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        // generated code will be not compilable since return type of MYPLUS 
and type of 'x' are different
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+        stmt.add("INSERT INTO BAR SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 
1) = 'x'");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_ERROR_EXPECTED);
+
+        Assert.fail("Should raise CompilerException.");
     }
 
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockDeptDataSource();
+    @Test
+    public void testExternalUdf() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+        stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE 
ID > 2");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        List<List<Object>> values = getCollectedValues();
+        impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, 
WAIT_TIMEOUT_MS);
+
+        Assert.assertEquals(2, values.size());
+        Assert.assertEquals(4, values.get(0).get(0));
+        Assert.assertEquals(5, values.get(1).get(0));
     }
 
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
-                                                  Properties properties, 
List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentJoinDataSourceDept();
+    @Test(expected = UnsupportedOperationException.class)
+    public void testExternalUdfUsingJar() throws Exception {
+        List<String> stmt = new ArrayList<>();
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
+        stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE 
ID > 2");
+        StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
+
+        impl.runLocal(cluster, stmt, (__) -> true, 
WAIT_TIMEOUT_MS_ERROR_EXPECTED);
+
+        Assert.fail("Should raise UnsupportedOperationException.");
     }
-  }
-
-
-  @BeforeClass
-  public static void setUp() {
-    DataSourcesRegistry.providerMap().put("mock", new 
MockDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mocknested", new 
MockNestedDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockgroup", new 
MockGroupDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockemp", new 
MockEmpDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockdept", new 
MockDeptDataSourceProvider());
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    DataSourcesRegistry.providerMap().remove("mock");
-    DataSourcesRegistry.providerMap().remove("mocknested");
-  }
-
-  @Test
-  public void testExternalDataSource() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(4, values.get(0).get(0));
-    Assert.assertEquals(5, values.get(1).get(0));
-  }
-
-  @Test
-  public void testExternalDataSourceNested() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
-                     "FROM FOO " +
-                     "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND 
CAST(ARRAYFIELD[2] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
-    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
-  }
-
-  @Test
-  public void testExternalNestedNonExistKeyAccess() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // this triggers java.lang.RuntimeException: Cannot convert null to int
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedNonExistKeyAccess2() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // this triggers java.lang.RuntimeException: Cannot convert null to int
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedInvalidAccessStringIndexOnArray() throws 
Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test(expected = ValidationException.class)
-  public void testExternalUdfType() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-
-  }
-
-  @Test(expected = CompilingClassLoader.CompilerException.class)
-  public void testExternalUdfType2() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // generated code will be not compilable since return type of MYPLUS and 
type of 'x' are different
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalUdf() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(4, values.get(0).get(0));
-    Assert.assertEquals(5, values.get(1).get(0));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testExternalUdfUsingJar() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
-    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-  }
-
-  @Test
-  public void testGroupbyBuiltin() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(SALARY) FROM FOO 
GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(3, values.get(0).get(2));
-    Assert.assertEquals(12, values.get(1).get(2));
-    Assert.assertEquals(21, values.get(2).get(2));
-    Assert.assertEquals(9, values.get(3).get(2));
-  }
-
-  @Test
-  public void testGroupbyBuiltinWithFilter() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(PCT) FROM FOO WHERE 
ID = 1 GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(3L, values.get(0).get(1));
-    Assert.assertEquals(12, values.get(0).get(2));
-    Assert.assertEquals(2.5, values.get(0).get(3));
-  }
-
-  @Test
-  public void testGroupbyBuiltinAndUDF() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("CREATE FUNCTION MYCONCAT AS 
'org.apache.storm.sql.TestUtils$MyConcat'");
-    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
-    stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) 
FROM FOO GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(3, values.get(0).get(1));
-    Assert.assertEquals("xxx", values.get(0).get(2));
-    Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
-    Assert.assertEquals(12, values.get(1).get(1));
-    Assert.assertEquals("xxx", values.get(1).get(2));
-    Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
-    Assert.assertEquals(21, values.get(2).get(1));
-    Assert.assertEquals("xxx", values.get(2).get(2));
-    Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
-    Assert.assertEquals(9, values.get(3).get(1));
-    Assert.assertEquals("x", values.get(3).get(2));
-    Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
-  }
-
-  @Test
-  public void testAggFnNonSqlReturnType() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
-    stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID 
>= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2));
-    Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2));
-    Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2));
-    Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2));
-  }
-
-  @Test
-  public void testGroupbySameAggregateOnDifferentColumns() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), AVG(SALARY), AVG(PCT) FROM FOO WHERE 
ID = 1 GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(3L, values.get(0).get(1));
-    Assert.assertEquals(4, values.get(0).get(2));
-    Assert.assertEquals(2.5, values.get(0).get(3));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testGroupbyBuiltinNotimplemented() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), STDDEV_POP(SALARY) FROM FOO GROUP BY 
(ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-  }
-
-  @Test
-  public void testMinMax() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP 
BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(0, values.get(0).get(2));
-    Assert.assertEquals(3, values.get(1).get(2));
-    Assert.assertEquals(6, values.get(2).get(2));
-    Assert.assertEquals(9, values.get(3).get(2));
-
-    Assert.assertEquals(1.5, values.get(0).get(3));
-    Assert.assertEquals(3.0, values.get(1).get(3));
-    Assert.assertEquals(4.5, values.get(2).get(3));
-    Assert.assertEquals(5.0, values.get(3).get(3));
-  }
-  @Test
-  public void testFilterGroupbyHaving() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, MIN(SALARY) FROM FOO where ID > 0 GROUP BY 
(ID) HAVING ID > 2 AND MAX(SALARY) > 5");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(3, values.get(0).get(0));
-    Assert.assertEquals(9, values.get(0).get(1));
-  }
-
-  @Test
-  public void testGroupByMultipleFields() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (DEPTID INT PRIMARY KEY, SALARY INT, 
PCT DOUBLE, NAME VARCHAR, EMPID INT) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM DEPTID, EMPID, COUNT(*), MIN(SALARY), MAX(PCT) 
FROM FOO GROUP BY DEPTID, EMPID");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(7, values.size());
-    Assert.assertEquals(0, values.get(0).get(0));
-    Assert.assertEquals(0, values.get(0).get(1));
-    Assert.assertEquals(2L, values.get(0).get(2));
-  }
-
-  @Test
-  public void testjoin() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME 
VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
-    stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME 
VARCHAR) LOCATION 'mockdept:///foo'");
-    stmt.add("SELECT STREAM EMPID, EMPNAME, DEPTNAME FROM EMP AS e JOIN DEPT 
AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-    Assert.assertEquals(3, values.size());
-    Assert.assertEquals("emp1", values.get(0).get(1));
-    Assert.assertEquals("dept1", values.get(0).get(2));
-    Assert.assertEquals("emp2", values.get(1).get(1));
-    Assert.assertEquals("dept1", values.get(1).get(2));
-    Assert.assertEquals("emp3", values.get(2).get(1));
-    Assert.assertEquals("dept2", values.get(2).get(2));
-  }
-
-  @Test
-  public void testjoinAndGroupby() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME 
VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
-    stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME 
VARCHAR) LOCATION 'mockdept:///foo'");
-    stmt.add("SELECT STREAM d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS 
d ON e.DEPTID = d.DEPTID WHERE e.empid > 0" +
-                     "GROUP BY d.DEPTID");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(2L, values.get(0).get(1));
-    Assert.assertEquals(2, values.get(1).get(0));
-    Assert.assertEquals(1L, values.get(1).get(1));
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
deleted file mode 100644
index 634e454..0000000
--- 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.base.Function;
-import org.apache.storm.sql.compiler.backends.standalone.TestCompilerUtils;
-import org.apache.storm.tuple.Values;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestExprSemantic {
-  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-
-  @Test
-  public void testLogicalExpr() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
-                           "NOT (ID > 0 AND ID < 1)"));
-    assertEquals(new Values(true, false, true), v);
-  }
-
-  @Test
-  public void testExpectOperator() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
-                           "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
-                           "TRUE IS FALSE", "UNKNOWN IS NULL",
-                           "UNKNOWN IS NOT NULL"));
-    assertEquals(new Values(true, false, false, true, false, true, false), v);
-  }
-
-  @Test
-  public void testDistinctBetweenLikeSimilarIn() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList("TRUE IS DISTINCT FROM TRUE",
-                    "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5",
-                    "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'",
-                    "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO 
'[a-zA-Z]+[cd]{1}'",
-                    "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', 
'2', '3', '4')",
-                    "2 NOT IN (1, 3, 5)"));
-    assertEquals(new Values(false, false, true, true, true,
-          false, true, true, true, true), v);
-  }
-
-  @Test
-  public void testCaseStatement() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') 
" +
-                    "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn 
CONCAT('abcd', '#')} END",
-                    "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " 
+
-                    "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', 
'#')} END",
-                    "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') 
" +
-                    "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', 
'#')} END"
-                    )
-    );
-
-    // TODO: The data type of literal Calcite assigns seems to be out of 
expectation. Please see below logical plan.
-    // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), 
=('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" 
COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), 
CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], 
EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), 
CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 
'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 
CAST('b'):CHAR(4) C
 HARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 
io}, id = 5
-    //   LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 1.0, 
cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4
-    //     EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative cost 
= {0.0 rows, 1.0 cpu, 0.0 io}, id = 3
-    // in result, both 'b' and UPPER('a') hence 'A' are having some spaces 
which is not expected.
-    // When we use CASE with actual column (Java String type hence VARCHAR), 
it seems to work as expected.
-    // Please refer trident/TestPlanCompiler#testCaseStatement(), and see 
below logical plan.
-    // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 
'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET 
"ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, 
'#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 
cpu, 0.0 io}, id = 3
-    List<Object> v2 = Lists.transform(v, new Function<Object, Object>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable Object o) {
-        return ((String) o).trim();
-      }
-    });
-    assertArrayEquals(new Values("abcd#", "b", "A").toArray(), v2.toArray());
-  }
-
-  @Test
-  public void testNullIfAndCoalesce() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 5, 
4, NULL)", "COALESCE(1, 5)"
-            ));
-    assertEquals(new Values(null, 5, 5, 1), v);
-  }
-
-  @Test
-  public void testCollectionFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 5])"
-            ));
-    assertEquals(new Values(3, 5), v);
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void testElementFunctionMoreThanOneValue() throws Exception {
-    testExpr(
-            Lists.newArrayList(
-                    "ELEMENT(ARRAY[1, 2, 3])"
-            ));
-    fail("ELEMENT with array which has multiple elements should throw 
exception in runtime.");
-  }
-
-  @Test
-  public void testArithmeticWithNull() throws Exception {
-    Values v = testExpr(
-      Lists.newArrayList(
-          "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", "CAST(NULL AS INT) 
+ CAST(NULL AS INT)", "1 + 2"
-      ));
-    assertEquals(new Values(null, null, null, 3), v);
-  }
-
-  @Test
-  public void testNotWithNull() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList(
-            "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
-        ));
-    assertEquals(new Values(false, true, null), v);
-  }
-
-  @Test
-  public void testAndWithNull() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList(
-            "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND UNKNOWN",
-            "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
-            "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND UNKNOWN"
-        ));
-    assertEquals(new Values(null, false, null, true, false, null, false,
-                            false, false), v);
-  }
-
-  @Test
-  public void testAndWithNullable() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "ADDR = 'a' AND NAME = 'a'", "NAME = 'a' AND ADDR = 'a'", 
"NAME = 'x' AND ADDR = 'a'", "ADDR = 'a' AND NAME = 'x'"
-            ));
-    assertEquals(new Values(false, false, null, null), v);
-  }
-
-  @Test
-  public void testOrWithNullable() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "ADDR = 'a'  OR NAME = 'a'", "NAME = 'a' OR ADDR = 'a' ", 
"NAME = 'x' OR ADDR = 'a' ", "ADDR = 'a'  OR NAME = 'x'"
-            ));
-    assertEquals(new Values(null, null, true, true), v);
-  }
-
-  @Test
-  public void testOrWithNull() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList(
-            "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR UNKNOWN",
-            "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
-            "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
-            ));
-    assertEquals(new Values(true, null, null, true, true, true, true,
-                            false, null), v);
-  }
-
-  @Test
-  public void testEquals() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList(
-            "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", 
"UNKNOWN = 'a'", "'a' = 'b'",
-            "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", 
"UNKNOWN <> 'a'", "'a' <> 'b'"
-        ));
-    assertEquals(new Values(false, null, true, null, null, false,
-        true, null, false, null, null, true), v);
-  }
-
-  @Test
-  public void testArithmeticFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)",
-                    "CEIL(123.45)", "FLOOR(123.45)"
-            ));
-
-    assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new 
BigDecimal(123)), v);
-
-    // Belows are floating numbers so comparing this with literal is tend to 
be failing...
-    // Picking int value and compare
-    Values v2 = testExpr(
-            Lists.newArrayList(
-                    "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)"
-            ));
-    List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
-      @Nullable
-      @Override
-      public Object apply(@Nullable Object o) {
-        // only takes int value
-        return ((Number) o).intValue();
-      }
-    });
-
-    // 15.9687, 2.7725, 4.0, 22026.465794
-    assertEquals(new Values(15, 2, 4, 22026), v2m);
-  }
-
-  @Test
-  public void testStringFunctions() throws Exception {
-    Values v = testExpr(
-        Lists.newArrayList(
-                "'ab' || 'cd'", "CHAR_LENGTH('foo')", 
"CHARACTER_LENGTH('foo')",
-                "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')",
-                "TRIM(BOTH ' ' FROM '  abcdeabcdeabc  ')",
-                "TRIM(LEADING ' ' FROM '  abcdeabcdeabc  ')",
-                "TRIM(TRAILING ' ' FROM '  abcdeabcdeabc  ')",
-                "OVERLAY('abcde' PLACING 'bc' FROM 3)",
-                "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' FROM 3 
FOR 4)",
-                "INITCAP('foo')"
-        ));
-    assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", 
"abcdeabcdeabc  ", "  abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v);
-  }
-
-  @Test
-  public void testBinaryStringFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "x'45F0AB' || x'45F0AB'",
-                    "POSITION(x'F0' IN x'453423F0ABBC')",
-                    "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3)"
-                    // "SUBSTRING(x'453423F0ABBC' FROM 3)",
-                    // "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4)"
-            ));
-
-    // TODO: Calcite 1.9.0 has bugs on binary SUBSTRING functions
-    // as there's no 
SqlFunctions.substring(org.apache.calcite.avatica.util.ByteString, ...)
-    // commented out testing substring function
-
-    assertEquals("45f0ab45f0ab", v.get(0).toString());
-    assertEquals(4, v.get(1));
-    assertEquals("45344534abbc45", v.get(2).toString());
-    // assertEquals("23f0abbc", v.get(3).toString());
-    // assertEquals("23f0ab", v.get(4).toString());
-  }
-
-  @Test
-  public void testDateAndTimestampLiteral() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "DATE '1970-05-15' AS datefield",
-                    "TIME '00:00:00' AS timefield",
-                    "TIMESTAMP '2016-01-01 00:00:00' as timestampfield"
-            )
-    );
-
-    assertEquals(3, v.size());
-    assertEquals(134, v.get(0));
-    assertEquals(0, v.get(1));
-    assertEquals(1451606400000L, v.get(2));
-  }
-
-  @Test
-  public void testInterval() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "INTERVAL '1-5' YEAR TO MONTH AS intervalfield",
-                    "(DATE '1970-01-01', DATE '1970-01-15') AS 
anchoredinterval_field"
-            )
-    );
-
-    assertEquals(3, v.size());
-    assertEquals(17, v.get(0));
-    assertEquals(0, v.get(1));
-    assertEquals(14, v.get(2));
-  }
-
-  @Test
-  public void testDateFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "LOCALTIME = CURRENT_TIME, LOCALTIMESTAMP = 
CURRENT_TIMESTAMP, CURRENT_DATE",
-                    "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')",
-                    "FLOOR(DATE '2016-01-23' TO MONTH)",
-                    "CEIL(TIME '12:34:56' TO MINUTE)"
-            )
-    );
-
-    assertEquals(6, v.size());
-    assertTrue((boolean) v.get(0));
-    assertTrue((boolean) v.get(1));
-    // skip checking CURRENT_DATE since we don't inject dataContext so don't 
know about current timestamp
-    // we can do it from trident test
-    assertEquals(1L, v.get(3));
-    assertEquals(0L, v.get(4));
-    assertEquals(45300000, v.get(5));
-  }
-
-  @Test
-  public void testJDBCNumericFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 3)}", 
"{fn MOD(-10, 3)}"
-            ));
-
-    assertEquals(new Values(9.0d, 10, 1, -1), v);
-
-    // Belows are floating numbers so comparing this with literal is tend to 
be failing...
-    // Picking int value and compare
-    Values v2 = testExpr(
-            Lists.newArrayList(
-                    "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}"
-            ));
-    List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
-      @Nullable
-      @Override
-      public Object apply(@Nullable Object o) {
-        // only takes int value
-        return ((Number) o).intValue();
-      }
-    });
-
-    // 2.7725, 4.0, 22026.465794
-    assertEquals(new Values(2, 4, 22026), v2m);
-  }
-
-  @Test
-  public void testJDBCStringFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "{fn CONCAT('ab', 'cd')}",
-                    "{fn LOCATE('bc', 'abcdeabcde')}",
-                    //"{fn LOCATE('bc', 'abcdeabcde', 4)}",
-                    "{fn INSERT('abcd', 2, 3, 'de')}",
-                    "{fn LCASE('AbCdE')}",
-                    "{fn LENGTH('AbCdE')}",
-                    //"{fn LTRIM('  abcde  ')}",
-                    //"{fn RTRIM('  abcde  ')}",
-                    "{fn SUBSTRING('abcdeabcde', 3, 4)}",
-                    "{fn UCASE('AbCdE')}"
-            )
-    );
-
-    // TODO: Calcite 1.9.0 doesn't support {fn LOCATE(string1, string2 [, 
integer])}
-    // while it's on support list on SQL reference
-    // and bugs on LTRIM and RTRIM : throwing AssertionError: Internal error: 
pre-condition failed: pos != null
-    // commented out problematic function tests
-
-    assertEquals(new Values("abcd", 2, "ade", "abcde", 5, "cdea", "ABCDE"), v);
-  }
-
-  @Test
-  public void testJDBCDateTimeFunctions() throws Exception {
-    Values v = testExpr(
-            Lists.newArrayList(
-                    "{fn CURDATE()} = CURRENT_DATE", "{fn CURTIME()} = 
LOCALTIME", "{fn NOW()} = LOCALTIMESTAMP",
-                    "{fn QUARTER(DATE '2016-10-07')}", "{fn 
TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}",
-                    "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 
00:00:00', TIMESTAMP '2016-10-07 00:00:00')}"
-            )
-    );
-
-    assertEquals(new Values(true, true, true, 4L, 1475799300000L, 86400), v);
-  }
-
-  private Values testExpr(List<String> exprs) throws Exception {
-    String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
-        " WHERE ID > 0 AND ID < 2";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree());
-    Map<String, DataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockDataSource());
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    proc.initialize(data, h);
-    return values.get(0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
deleted file mode 100644
index 9dfc931..0000000
--- 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestCompilerUtils.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.compiler.CompilerUtil;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestCompilerUtils {
-
-    public static class MyPlus {
-        public static Integer eval(Integer x, Integer y) {
-            return x + y;
-        }
-    }
-
-    public static class MyStaticSumFunction {
-        public static long init() {
-            return 0L;
-        }
-        public static long add(long accumulator, int v) {
-            return accumulator + v;
-        }
-    }
-
-    public static class MySumFunction {
-        public MySumFunction() {
-        }
-        public long init() {
-            return 0L;
-        }
-        public long add(long accumulator, int v) {
-            return accumulator + v;
-        }
-        public long result(long accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static CalciteState sqlOverDummyTable(String sql)
-            throws RelConversionException, ValidationException, 
SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-                (RelDataTypeSystem.DEFAULT);
-        StreamableTable streamableTable = new 
CompilerUtil.TableBuilderInfo(typeFactory)
-                .field("ID", SqlTypeName.INTEGER)
-                .field("NAME", typeFactory.createType(String.class))
-                .field("ADDR", typeFactory.createType(String.class))
-                .build();
-        Table table = streamableTable.stream();
-        schema.add("FOO", table);
-        schema.add("BAR", table);
-        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
-        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-        sqlOperatorTables.add(SqlStdOperatorTable.instance());
-        sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
-                false,
-                Collections.<String>emptyList(), typeFactory));
-        SqlOperatorTable chainedSqlOperatorTable = new 
ChainedSqlOperatorTable(sqlOperatorTables);
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-                schema).operatorTable(chainedSqlOperatorTable).build();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-        System.out.println(RelOptUtil.toString(tree, 
SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static CalciteState sqlOverNestedTable(String sql)
-            throws RelConversionException, ValidationException, 
SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-                (RelDataTypeSystem.DEFAULT);
-
-        StreamableTable streamableTable = new 
CompilerUtil.TableBuilderInfo(typeFactory)
-                .field("ID", SqlTypeName.INTEGER)
-                .field("MAPFIELD",
-                        typeFactory.createTypeWithNullability(
-                                typeFactory.createMapType(
-                                        typeFactory.createTypeWithNullability(
-                                                
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                                        typeFactory.createTypeWithNullability(
-                                                
typeFactory.createSqlType(SqlTypeName.INTEGER), true))
-                                , true))
-                .field("NESTEDMAPFIELD",
-                        typeFactory.createTypeWithNullability(
-                            typeFactory.createMapType(
-                                    typeFactory.createTypeWithNullability(
-                                            
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                                    typeFactory.createTypeWithNullability(
-                                            typeFactory.createMapType(
-                                                    
typeFactory.createTypeWithNullability(
-                                                            
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                                                    
typeFactory.createTypeWithNullability(
-                                                            
typeFactory.createSqlType(SqlTypeName.INTEGER), true))
-                                            , true))
-                                        , true))
-                .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
-                        typeFactory.createArrayType(
-                            typeFactory.createTypeWithNullability(
-                                
typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
-                        , true))
-                .build();
-        Table table = streamableTable.stream();
-        schema.add("FOO", table);
-        schema.add("BAR", table);
-        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-        sqlOperatorTables.add(SqlStdOperatorTable.instance());
-        sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
-                                                       false,
-                                                       
Collections.<String>emptyList(), typeFactory));
-        SqlOperatorTable chainedSqlOperatorTable = new 
ChainedSqlOperatorTable(sqlOperatorTables);
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-                schema).operatorTable(chainedSqlOperatorTable).build();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-        System.out.println(RelOptUtil.toString(tree, 
SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static class CalciteState {
-        final SchemaPlus schema;
-        final RelNode tree;
-
-        private CalciteState(SchemaPlus schema, RelNode tree) {
-            this.schema = schema;
-            this.tree = tree;
-        }
-
-        public SchemaPlus schema() {
-            return schema;
-        }
-
-        public RelNode tree() {
-            return tree;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
deleted file mode 100644
index 3226810..0000000
--- 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.storm.tuple.Values;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestPlanCompiler {
-  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-
-  @Test
-  public void testCompile() throws Exception {
-    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree());
-    Map<String, DataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockDataSource());
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    proc.initialize(data, h);
-    Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
-                             values.toArray());
-  }
-
-  @Test
-  public void testLogicalExpr() throws Exception {
-    String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND 
ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree());
-    Map<String, DataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockDataSource());
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    proc.initialize(data, h);
-    Assert.assertEquals(new Values(true, false, true), values.get(0));
-  }
-
-  @Test
-  public void testNested() throws Exception {
-    String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
-            "FROM FOO " +
-            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree());
-    Map<String, DataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockNestedDataSource());
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    proc.initialize(data, h);
-    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
-    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
-  }
-
-  @Test
-  public void testUdf() throws Exception {
-    String sql = "SELECT MYPLUS(ID, 3)" +
-            "FROM FOO " +
-            "WHERE ID = 2";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree());
-    Map<String, DataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockDataSource());
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    proc.initialize(data, h);
-    Assert.assertEquals(new Values(5), values.get(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
deleted file mode 100644
index 4bee9aa..0000000
--- 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.containsString;
-
-public class TestRelNodeCompiler {
-  @Test
-  public void testFilter() throws Exception {
-    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
-        RelDataTypeSystem.DEFAULT);
-    LogicalProject project = (LogicalProject) state.tree();
-    LogicalFilter filter = (LogicalFilter) project.getInput();
-
-    try (StringWriter sw = new StringWriter();
-         PrintWriter pw = new PrintWriter(sw)
-    ) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      // standalone mode doesn't use inputstreams argument
-      compiler.visitFilter(filter, Collections.EMPTY_LIST);
-      pw.flush();
-      Assert.assertThat(sw.toString(), containsString("> 3"));
-    }
-
-    try (StringWriter sw = new StringWriter();
-         PrintWriter pw = new PrintWriter(sw)
-    ) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      // standalone mode doesn't use inputstreams argument
-      compiler.visitProject(project, Collections.EMPTY_LIST);
-      pw.flush();
-      Assert.assertThat(sw.toString(), containsString(" + 1"));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java
new file mode 100644
index 0000000..16eb852
--- /dev/null
+++ 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.compiler.backends.trident;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.storm.tuple.Values;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Only for testing expression. We're leveraging Avatica to make tests faster.
+ */
+public class TestExpressions {
+    @Test
+    public void testLogicalExpr() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList("s.\"id\" > 0 OR s.\"id\" < 1", "s.\"id\" > 
0 AND s.\"id\" < 1",
+                        "NOT (s.\"id\" > 0 AND s.\"id\" < 1)"));
+        assertEquals(new Values(true, false, true), v);
+    }
+
+    @Test
+    public void testExpectOperator() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
+                        "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
+                        "TRUE IS FALSE", "UNKNOWN IS NULL",
+                        "UNKNOWN IS NOT NULL"));
+        assertEquals(new Values(true, false, false, true, false, true, false), 
v);
+    }
+
+    @Test
+    public void testDistinctBetweenLikeSimilarIn() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList("TRUE IS DISTINCT FROM TRUE",
+                        "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5",
+                        "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'",
+                        "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO 
'[a-zA-Z]+[cd]{1}'",
+                        "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN 
('1', '2', '3', '4')",
+                        "2 NOT IN (1, 3, 5)"));
+        assertEquals(new Values(false, false, true, true, true,
+                false, true, true, true, true), v);
+    }
+
+    @Test
+    public void testCaseStatement() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN 
UPPER('a') " +
+                                "WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn 
CONCAT('abcd', '#')} END",
+                        "CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN 
UPPER('a') " +
+                                "WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn 
CONCAT('ab', '#')} END",
+                        "CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN 
UPPER('a') " +
+                                "WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn 
CONCAT('abc', '#')} END"
+                )
+        );
+
+        // TODO: The data type of literal Calcite assigns seems to be out of 
expectation. Please see below logical plan.
+        // LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), 
=('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER SET "ISO-8859-1" 
COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), 
CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL), 'b', CAST(||('abcd', '#')):VARCHAR(5) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], 
EXPR$1=[CASE(OR(=('ab', 'a'), =('ab', 'abc'), =('ab', 'abcde')), 
CAST(UPPER('a')):CHAR(3) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('ab'), 'AB'), CAST('b'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('ab', '#'))], EXPR$2=[CASE(OR(=('abc', 'a'), =('abc', 'abc'), =('abc', 
'abcde')), CAST(UPPER('a')):CHAR(4) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abc'), CAST('AB'):CHAR(3) 
CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 
CAST('b'):CHAR(
 4) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, 
||('abc', '#'))]): rowcount = 1.0, cumulative cost = {2.0 rows, 5.0 cpu, 0.0 
io}, id = 5
+        //   LogicalFilter(condition=[AND(>($0, 0), <($0, 2))]): rowcount = 
1.0, cumulative cost = {1.0 rows, 2.0 cpu, 0.0 io}, id = 4
+        //     EnumerableTableScan(table=[[FOO]]): rowcount = 1.0, cumulative 
cost = {0.0 rows, 1.0 cpu, 0.0 io}, id = 3
+        // in result, both 'b' and UPPER('a') hence 'A' are having some spaces 
which is not expected.
+        // When we use CASE with actual column (Java String type hence 
VARCHAR), it seems to work as expected.
+        // Please refer trident/TestPlanCompiler#testCaseStatement(), and see 
below logical plan.
+        // LogicalProject(EXPR$0=[CASE(OR(=($1, 'a'), =($1, 'abc'), =($1, 
'abcde')), CAST(UPPER('a')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary", =(CAST(UPPER($1)):VARCHAR(2) CHARACTER SET 
"ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary", 'AB'), 'b', CAST(||($1, 
'#')):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE 
"ISO-8859-1$en_US$primary")]): rowcount = 1.0, cumulative cost = {1.0 rows, 2.0 
cpu, 0.0 io}, id = 3
+        List<Object> v2 = Lists.transform(v, new Function<Object, Object>() {
+            @Nullable
+            @Override
+            public String apply(@Nullable Object o) {
+                return ((String) o).trim();
+            }
+        });
+        assertArrayEquals(new Values("abcd#", "b", "A").toArray(), 
v2.toArray());
+    }
+
+    @Test
+    public void testNullIfAndCoalesce() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "NULLIF(5, 5)", "NULLIF(5, 0)", "COALESCE(NULL, NULL, 
5, 4, NULL)", "COALESCE(1, 5)"
+                ));
+        assertEquals(new Values(null, 5, 5, 1), v);
+    }
+
+    @Test
+    public void testCollectionFunctions() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "ELEMENT(ARRAY[3])", "CARDINALITY(ARRAY[1, 2, 3, 4, 
5])"
+                ));
+        assertEquals(new Values(3, 5), v);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testElementFunctionMoreThanOneValue() throws Exception {
+        testExpr(
+                Lists.newArrayList(
+                        "ELEMENT(ARRAY[1, 2, 3])"
+                ));
+        fail("ELEMENT with array which has multiple elements should throw 
exception in runtime.");
+    }
+
+    @Test
+    public void testArithmeticWithNull() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", 
"CAST(NULL AS INT) + CAST(NULL AS INT)", "1 + 2"
+                ));
+        assertEquals(new Values(null, null, null, 3), v);
+    }
+
+    @Test
+    public void testNotWithNull() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
+                ));
+        assertEquals(new Values(false, true, null), v);
+    }
+
+    @Test
+    public void testAndWithNull() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND 
UNKNOWN",
+                        "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
+                        "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND 
UNKNOWN"
+                ));
+        assertEquals(new Values(null, false, null, true, false, null, false,
+                false, false), v);
+    }
+
+    @Test
+    public void testAndWithNullable() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "s.\"addr\" = 'a' AND s.\"name\" = 'a'", "s.\"name\" = 
'a' AND s.\"addr\" = 'a'", "s.\"name\" = 'x' AND s.\"addr\" = 'a'", "s.\"addr\" 
= 'a' AND s.\"name\" = 'x'"
+                ));
+        assertEquals(new Values(false, false, null, null), v);
+    }
+
+    @Test
+    public void testOrWithNullable() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "s.\"addr\" = 'a'  OR s.\"name\" = 'a'", "s.\"name\" = 
'a' OR s.\"addr\" = 'a' ", "s.\"name\" = 'x' OR s.\"addr\" = 'a' ", "s.\"addr\" 
= 'a'  OR s.\"name\" = 'x'"
+                ));
+        assertEquals(new Values(null, null, true, true), v);
+    }
+
+    @Test
+    public void testOrWithNull() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR 
UNKNOWN",
+                        "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
+                        "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
+                ));
+        assertEquals(new Values(true, null, null, true, true, true, true,
+                false, null), v);
+    }
+
+    @Test
+    public void testEquals() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = 
UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
+                        "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> 
UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
+                ));
+        assertEquals(new Values(false, null, true, null, null, false,
+                true, null, false, null, null, true), v);
+    }
+
+    @Test
+    public void testArithmeticFunctions() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "POWER(3, 2)", "ABS(-10)", "MOD(10, 3)", "MOD(-10, 3)",
+                        "CEIL(123.45)", "FLOOR(123.45)"
+                ));
+
+        assertEquals(new Values(9.0d, 10, 1, -1, new BigDecimal(124), new 
BigDecimal(123)), v);
+
+        // Belows are floating numbers so comparing this with literal is tend 
to be failing...
+        // Picking int value and compare
+        List<Object> v2 = testExpr(
+                Lists.newArrayList(
+                        "SQRT(255)", "LN(16)", "LOG10(10000)", "EXP(10)"
+                ));
+        List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+            @Nullable
+            @Override
+            public Object apply(@Nullable Object o) {
+                // only takes int value
+                return ((Number) o).intValue();
+            }
+        });
+
+        // 15.9687, 2.7725, 4.0, 22026.465794
+        assertEquals(new Values(15, 2, 4, 22026), v2m);
+    }
+
+    @Test
+    public void testStringFunctions() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "'ab' || 'cd'", "CHAR_LENGTH('foo')", 
"CHARACTER_LENGTH('foo')",
+                        "UPPER('a')", "LOWER('A')", "POSITION('bc' IN 'abcd')",
+                        "TRIM(BOTH ' ' FROM '  abcdeabcdeabc  ')",
+                        "TRIM(LEADING ' ' FROM '  abcdeabcdeabc  ')",
+                        "TRIM(TRAILING ' ' FROM '  abcdeabcdeabc  ')",
+                        "OVERLAY('abcde' PLACING 'bc' FROM 3)",
+                        "SUBSTRING('abcde' FROM 3)", "SUBSTRING('abcdeabcde' 
FROM 3 FOR 4)",
+                        "INITCAP('foo')"
+                ));
+        assertEquals(new Values("abcd", 3, 3, "A", "a", 2, "abcdeabcdeabc", 
"abcdeabcdeabc  ", "  abcdeabcdeabc", "abbce", "cde", "cdea", "Foo"), v);
+    }
+
+    @Test
+    public void testJDBCNumericFunctions() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "{fn POWER(3, 2)}", "{fn ABS(-10)}", "{fn MOD(10, 
3)}", "{fn MOD(-10, 3)}"
+                ));
+
+        assertEquals(new Values(9.0d, 10, 1, -1), v);
+
+        // Belows are floating numbers so comparing this with literal is tend 
to be failing...
+        // Picking int value and compare
+        List<Object> v2 = testExpr(
+                Lists.newArrayList(
+                        "{fn LOG(16)}", "{fn LOG10(10000)}", "{fn EXP(10)}"
+                ));
+        List<Object> v2m = Lists.transform(v2, new Function<Object, Object>() {
+            @Nullable
+            @Override
+            public Object apply(@Nullable Object o) {
+                // only takes int value
+                return ((Number) o).intValue();
+            }
+        });
+
+        // 2.7725, 4.0, 22026.465794
+        assertEquals(new Values(2, 4, 22026), v2m);
+    }
+
+    @Test
+    public void testJDBCStringFunctions() throws Exception {
+        List<Object> v = testExpr(
+                Lists.newArrayList(
+                        "{fn CONCAT('ab', 'cd')}",
+                        "{fn LOCATE('bc', 'abcdeabcde')}",
+                        "{fn LOCATE('bc', 'abcdeabcde', 4)}",
+                        "{fn INSERT('abcd', 2, 3, 'de')}",
+                        "{fn LCASE('AbCdE')}",
+                        "{fn LENGTH('AbCdE')}",
+                        "{fn LTRIM('  abcde  ')}",
+                        "{fn RTRIM('  abcde  ')}",
+                        "{fn SUBSTRING('abcdeabcde', 3, 4)}",
+                        "{fn UCASE('AbCdE')}"
+                )
+        );
+
+        assertEquals(new Values("abcd", 2, 7, "ade", "abcde", 5, "abcde  ", "  
abcde", "cdea", "ABCDE"), v);
+    }
+
+    private List<Object> testExpr(List<String> exprs) throws Exception {
+        Class.forName("org.apache.calcite.jdbc.Driver");
+        Connection connection =
+                DriverManager.getConnection("jdbc:calcite:");
+        CalciteConnection calciteConnection =
+                connection.unwrap(CalciteConnection.class);
+        SchemaPlus rootSchema = calciteConnection.getRootSchema();
+        rootSchema.add("expr", new ReflectiveSchema(new ExprDatabase()));
+        Statement statement = connection.createStatement();
+        ResultSet resultSet =
+                statement.executeQuery("select " + Joiner.on(',').join(exprs) 
+ " \n"
+                        + "from \"expr\".\"expressions\" as s\n"
+                        + " WHERE s.\"id\" > 0 AND s.\"id\" < 2");
+
+        List<Object> result = null;
+        while (resultSet.next()) {
+            if (result != null) {
+                Assert.fail("The query is expected to have only one result 
row");
+            }
+
+            int n = resultSet.getMetaData().getColumnCount();
+
+            result = new ArrayList<>();
+            for (int i = 1 ; i <= n; i++) {
+                result.add(resultSet.getObject(i));
+            }
+        }
+
+        resultSet.close();
+        statement.close();
+        connection.close();
+
+        return result;
+    }
+
+    public static class ExprDatabase {
+        public final ExpressionTable[] expressions = {
+            new ExpressionTable(0, "x", null),
+            new ExpressionTable(1, "x", null),
+            new ExpressionTable(2, "x", null),
+            new ExpressionTable(3, "x", null),
+            new ExpressionTable(4, "x", null)
+        };
+    }
+
+    public static class ExpressionTable {
+        public final int id;
+        public final String name;
+        public final String addr;
+
+        public ExpressionTable(int id, String name, String addr) {
+            this.id = id;
+            this.name = name;
+            this.addr = addr;
+        }
+    }
+
+}

Reply via email to