STORM-2125 Use Calcite's implementation of Rex Compiler

* Upgrade Calcite to latest 1.10.0
  * address CALCITE-1386 to keep supporting nested map / array
    * try best to keep runtime-safety with nested map / array type
    * when given detailed type information, it supports full features of nested 
map / array
* borrow JaninoRexCompiler and modify to return Expression instead of Scalar 
(RexNodeToBlockStatementCompiler)
  * since we need to pass code block to Bolts, not executable method 
implementation on classloader
* modify codebase to use RexNodeToBlockStatementCompiler
  * remove ExprCompiler and its test
* add tests for all of scalar functions Calcite supports
  * we don't use Calcite aggregate functions and window functions so no need to 
add them now
* add a simple subquery unit test
  * need to dig and add more case of subqueries later
* Fix aggregate fields issue with Trident behavior
* Fix DATE/TIME unit tests with timezone issue
* address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dec76a7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dec76a7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dec76a7e

Branch: refs/heads/1.x-branch
Commit: dec76a7e957d8f2f2c283becf6bc1ef53a191e1b
Parents: cdcfaef
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Thu Sep 22 15:38:44 2016 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Wed Oct 12 18:26:29 2016 +0900

----------------------------------------------------------------------
 external/sql/storm-sql-core/pom.xml             |  25 -
 .../storm-sql-core/src/codegen/data/Parser.tdd  |   6 +-
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |   9 +-
 .../apache/storm/sql/compiler/ExprCompiler.java | 523 -------------------
 .../RexNodeToBlockStatementCompiler.java        | 102 ++++
 .../backends/standalone/PlanCompiler.java       |  16 +-
 .../backends/standalone/RelNodeCompiler.java    |  68 ++-
 .../compiler/backends/trident/PlanCompiler.java |   9 +-
 .../trident/TridentLogicalPlanCompiler.java     |  84 +--
 .../test/org/apache/storm/sql/TestStormSql.java |  70 ++-
 .../storm/sql/compiler/TestCompilerUtils.java   |  34 +-
 .../storm/sql/compiler/TestExprCompiler.java    |  93 ----
 .../storm/sql/compiler/TestExprSemantic.java    | 262 +++++++++-
 .../backends/standalone/TestPlanCompiler.java   |   6 +-
 .../standalone/TestRelNodeCompiler.java         |   2 +-
 .../backends/trident/TestPlanCompiler.java      | 143 ++++-
 external/sql/storm-sql-runtime/pom.xml          |  13 -
 .../calcite/interpreter/StormContext.java       |  31 ++
 .../sql/runtime/calcite/StormDataContext.java   |  79 +++
 .../trident/functions/EvaluationFilter.java     |  21 +-
 .../trident/functions/EvaluationFunction.java   |  22 +-
 .../test/org/apache/storm/sql/TestUtils.java    |  91 +++-
 pom.xml                                         |  11 +-
 23 files changed, 923 insertions(+), 797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml 
b/external/sql/storm-sql-core/pom.xml
index f2e29ca..b8790a4 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -61,36 +61,11 @@
             <version>${calcite.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>commons-dbcp</groupId>
-                    <artifactId>commons-dbcp</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>jsr305</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.janino</groupId>
-                    <artifactId>janino</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.janino</groupId>
-                    <artifactId>commons-compiler</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-annotations</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-        <!-- janino -->
-        <dependency>
-            <groupId>org.codehaus.janino</groupId>
-            <artifactId>janino</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd 
b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index 1e921c7..22a60f6 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -52,6 +52,9 @@
   dataTypeParserMethods: [
   ]
 
+  nonReservedKeywords: [
+  ]
+
   # List of files in @includes directory that have parser method
   # implementations for custom SQL statements, literals or types
   # given as part of "statementParserMethods", "literalParserMethods" or
@@ -62,5 +65,6 @@
 
   includeCompoundIdentifier: true,
   includeBraces: true,
-  includeAdditionalDeclarations: false
+  includeAdditionalDeclarations: false,
+  allowBangEqual: false
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 61f3b6f..368bbea 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.sql;
 
+import org.apache.calcite.DataContext;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.prepare.CalciteCatalogReader;
@@ -41,18 +43,15 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
 import org.apache.storm.sql.runtime.*;
-import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
 import org.apache.storm.trident.TridentTopology;
 
 import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -66,7 +65,6 @@ import java.util.Map;
 import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
 
 import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
 
@@ -115,13 +113,14 @@ class StormSqlImpl extends StormSql {
       } else if (node instanceof SqlCreateFunction) {
         handleCreateFunction((SqlCreateFunction) node);
       }  else {
+        DataContext dataContext = new StormDataContext();
         FrameworkConfig config = buildFrameWorkConfig();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
         org.apache.storm.sql.compiler.backends.trident.PlanCompiler compiler =
-                new 
org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, 
typeFactory);
+                new 
org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, 
typeFactory, dataContext);
         TridentTopology topo = compiler.compile(tree);
         Path jarPath = null;
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
deleted file mode 100644
index 4e1c127..0000000
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Primitives;
-import org.apache.calcite.adapter.enumerable.NullPolicy;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.*;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-import org.apache.calcite.util.BuiltInMethod;
-import org.apache.calcite.util.NlsString;
-import org.apache.calcite.util.Util;
-import org.apache.storm.sql.runtime.StormSqlFunctions;
-
-import java.io.PrintWriter;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*;
-
-/**
- * Compile RexNode on top of the Tuple abstraction.
- */
-public class ExprCompiler implements RexVisitor<String> {
-  private final PrintWriter pw;
-  private final JavaTypeFactory typeFactory;
-  private static final ImpTable IMP_TABLE = new ImpTable();
-  private int nameCount;
-
-  public ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
-    this.pw = pw;
-    this.typeFactory = typeFactory;
-  }
-
-  @Override
-  public String visitInputRef(RexInputRef rexInputRef) {
-    String name = reserveName();
-    String typeName = javaTypeName(rexInputRef);
-    String boxedTypeName = boxedJavaTypeName(rexInputRef);
-    pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
-                           boxedTypeName, rexInputRef.getIndex()));
-    return name;
-  }
-
-  @Override
-  public String visitLocalRef(RexLocalRef rexLocalRef) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String visitLiteral(RexLiteral rexLiteral) {
-    Object v = rexLiteral.getValue();
-    RelDataType ty = rexLiteral.getType();
-    switch(rexLiteral.getTypeName()) {
-      case BOOLEAN:
-        return v.toString();
-      case CHAR:
-        return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true);
-      case NULL:
-        return "((" + 
((Class<?>)typeFactory.getJavaClass(ty)).getCanonicalName() + ")null)";
-      case DOUBLE:
-      case BIGINT:
-      case DECIMAL:
-        switch (ty.getSqlTypeName()) {
-          case TINYINT:
-          case SMALLINT:
-          case INTEGER:
-            return Long.toString(((BigDecimal) v).longValueExact());
-          case BIGINT:
-            return Long.toString(((BigDecimal)v).longValueExact()) + 'L';
-          case DECIMAL:
-          case FLOAT:
-          case REAL:
-          case DOUBLE:
-            return Util.toScientificNotation((BigDecimal) v);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException();
-    }
-    return null;
-  }
-
-  @Override
-  public String visitCall(RexCall rexCall) {
-    return IMP_TABLE.compile(this, rexCall);
-  }
-
-  @Override
-  public String visitOver(RexOver rexOver) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String visitCorrelVariable(
-      RexCorrelVariable rexCorrelVariable) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String visitDynamicParam(
-      RexDynamicParam rexDynamicParam) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String visitRangeRef(RexRangeRef rexRangeRef) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String visitFieldAccess(
-      RexFieldAccess rexFieldAccess) {
-    throw new UnsupportedOperationException();
-  }
-
-  private String javaTypeName(RexNode node) {
-    Type ty = typeFactory.getJavaClass(node.getType());
-    return ((Class<?>)ty).getCanonicalName();
-  }
-
-  private String boxedJavaTypeName(RexNode node) {
-    Type ty = typeFactory.getJavaClass(node.getType());
-    Class clazz = (Class<?>)ty;
-    if (clazz.isPrimitive()) {
-      clazz = Primitives.wrap(clazz);
-    }
-
-    return clazz.getCanonicalName();
-  }
-
-  private String reserveName() {
-    return "t" + ++nameCount;
-  }
-
-  // Only generate inline expressions when comparing primitive types
-  private boolean primitiveCompareExpr(SqlOperator op, RelDataType type) {
-    final Primitive primitive = 
Primitive.ofBoxOr(typeFactory.getJavaClass(type));
-    return primitive != null &&
-        (op == LESS_THAN || op == LESS_THAN_OR_EQUAL || op == GREATER_THAN || 
op == GREATER_THAN_OR_EQUAL);
-  }
-
-  private interface CallExprPrinter {
-    String translate(ExprCompiler compiler, RexCall call);
-  }
-
-  /**
-   * Inspired by Calcite's RexImpTable, the ImpTable class maps the operators
-   * to their corresponding implementation that generates the expressions in
-   * the format of Java source code.
-   */
-  private static class ImpTable {
-    private final Map<SqlOperator, CallExprPrinter> translators;
-
-    private ImpTable() {
-      ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder =
-          ImmutableMap.builder();
-      builder
-          .put(builtInMethod(UPPER, BuiltInMethod.UPPER, NullPolicy.STRICT))
-          .put(builtInMethod(LOWER, BuiltInMethod.LOWER, NullPolicy.STRICT))
-          .put(builtInMethod(INITCAP, BuiltInMethod.INITCAP, 
NullPolicy.STRICT))
-          .put(builtInMethod(SUBSTRING, BuiltInMethod.SUBSTRING, 
NullPolicy.STRICT))
-          .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, 
NullPolicy.STRICT))
-          .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, 
NullPolicy.STRICT))
-          .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, 
NullPolicy.STRICT))
-          .put(builtInMethod(ITEM, BuiltInMethod.ANY_ITEM, NullPolicy.STRICT))
-          .put(infixBinary(LESS_THAN, "<", "lt"))
-          .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
-          .put(infixBinary(GREATER_THAN, ">", "gt"))
-          .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=", "ge"))
-          .put(infixBinary(EQUALS, "==", StormSqlFunctions.class, "eq"))
-          .put(infixBinary(NOT_EQUALS, "<>", StormSqlFunctions.class, "ne"))
-          .put(infixBinary(PLUS, "+", "plus"))
-          .put(infixBinary(MINUS, "-", "minus"))
-          .put(infixBinary(MULTIPLY, "*", "multiply"))
-          .put(infixBinary(DIVIDE, "/", "divide"))
-          .put(infixBinary(DIVIDE_INTEGER, "/", "divide"))
-          .put(expect(IS_NULL, null))
-          .put(expectNot(IS_NOT_NULL, null))
-          .put(expect(IS_TRUE, true))
-          .put(expectNot(IS_NOT_TRUE, true))
-          .put(expect(IS_FALSE, false))
-          .put(expectNot(IS_NOT_FALSE, false))
-          .put(AND, AND_EXPR)
-          .put(OR, OR_EXPR)
-          .put(NOT, NOT_EXPR)
-          .put(CAST, CAST_EXPR);
-      this.translators = builder.build();
-    }
-
-    private CallExprPrinter getCallExprPrinter(SqlOperator op) {
-      if (translators.containsKey(op)) {
-        return translators.get(op);
-      } else if (op instanceof SqlUserDefinedFunction) {
-        Function function = ((SqlUserDefinedFunction) op).getFunction();
-        if (function instanceof ReflectiveFunctionBase) {
-          Method method = ((ReflectiveFunctionBase) function).method;
-          return methodCall(op, method, NullPolicy.STRICT).getValue();
-        }
-      }
-      return null;
-    }
-
-    private String compile(ExprCompiler compiler, RexCall call) {
-      SqlOperator op = call.getOperator();
-      CallExprPrinter printer = getCallExprPrinter(op);
-      if (printer == null) {
-        throw new UnsupportedOperationException();
-      } else {
-        return printer.translate(compiler, call);
-      }
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> methodCall(
-            final SqlOperator op, final Method method, NullPolicy nullPolicy) {
-      if (nullPolicy != NullPolicy.STRICT) {
-        throw new UnsupportedOperationException();
-      }
-      CallExprPrinter printer = new CallExprPrinter() {
-        @Override
-        public String translate(ExprCompiler compiler, RexCall call) {
-          PrintWriter pw = compiler.pw;
-          String val = compiler.reserveName();
-          pw.print(String.format("final %s %s;\n", 
compiler.javaTypeName(call), val));
-          List<String> args = new ArrayList<>();
-          for (RexNode op : call.getOperands()) {
-            args.add(op.accept(compiler));
-          }
-          pw.print("if (false) {}\n");
-          for (int i = 0; i < args.size(); ++i) {
-            String arg = args.get(i);
-            if (call.getOperands().get(i).getType().isNullable()) {
-              pw.print(String.format("else if (%2$s == null) { %1$s = null; 
}\n", val, arg));
-            }
-          }
-          String calc = printMethodCall(method, args);
-          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
-          return val;
-        }
-      };
-      return new AbstractMap.SimpleImmutableEntry<>(op, printer);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
-        final SqlOperator op, final BuiltInMethod method, NullPolicy 
nullPolicy) {
-      return methodCall(op, method.method, nullPolicy);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
-        (final SqlOperator op, final String javaOperator, final Class<?> 
clazz, final String backupMethodName) {
-      CallExprPrinter trans = new CallExprPrinter() {
-        @Override
-        public String translate(
-            ExprCompiler compiler, RexCall call) {
-          int size = call.getOperands().size();
-          assert size == 2;
-          String val = compiler.reserveName();
-          RexNode op0 = call.getOperands().get(0);
-          RexNode op1 = call.getOperands().get(1);
-          PrintWriter pw = compiler.pw;
-          if (backupMethodName != null) {
-            if (!compiler.primitiveCompareExpr(op, op0.getType())) {
-              String lhs = op0.accept(compiler);
-              String rhs = op1.accept(compiler);
-              pw.print(String.format("%s %s = %s;\n", 
compiler.javaTypeName(call), val,
-                  printMethodCall(clazz, backupMethodName, true, 
Lists.newArrayList(lhs, rhs))));
-              return val;
-            }
-          }
-          boolean lhsNullable = op0.getType().isNullable();
-          boolean rhsNullable = op1.getType().isNullable();
-
-          pw.print(String.format("final %s %s;\n", 
compiler.javaTypeName(call), val));
-          String lhs = op0.accept(compiler);
-          String rhs = op1.accept(compiler);
-          pw.print("if (false) {}\n");
-          if (lhsNullable) {
-            String calc = foldNullExpr(String.format("%s %s %s", lhs, 
javaOperator, rhs), "null", op1);
-            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; 
}\n", val, lhs, calc));
-          }
-          if (rhsNullable) {
-            String calc = foldNullExpr(String.format("%s %s %s", lhs, 
javaOperator, rhs), "null", op0);
-            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; 
}\n", val, rhs, calc));
-          }
-          String calc = String.format("%s %s %s", lhs, javaOperator, rhs);
-          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
-          return val;
-        }
-      };
-      return new AbstractMap.SimpleImmutableEntry<>(op, trans);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
-        (final SqlOperator op, final String javaOperator, final String 
backupMethodName) {
-      return infixBinary(op, javaOperator, SqlFunctions.class, 
backupMethodName);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> expect(
-        SqlOperator op, final Boolean expect) {
-      return expect0(op, expect, false);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> expectNot(
-        SqlOperator op, final Boolean expect) {
-      return expect0(op, expect, true);
-    }
-
-    private Map.Entry<SqlOperator, CallExprPrinter> expect0(
-        SqlOperator op, final Boolean expect, final boolean negate) {
-      CallExprPrinter trans = new CallExprPrinter() {
-        @Override
-        public String translate(
-            ExprCompiler compiler, RexCall call) {
-          assert call.getOperands().size() == 1;
-          String val = compiler.reserveName();
-          RexNode operand = call.getOperands().get(0);
-          boolean nullable = operand.getType().isNullable();
-          String op = operand.accept(compiler);
-          PrintWriter pw = compiler.pw;
-          if (!nullable) {
-            if (expect == null) {
-              pw.print(String.format("boolean %s = %b;\n", val, !negate));
-            } else {
-              pw.print(String.format("boolean %s = %s == %b;\n", val, op,
-                                     expect ^ negate));
-            }
-          } else {
-            String expr;
-            if (expect == null) {
-              expr = String.format("%s == null", op);
-            } else {
-              expr = String.format("%s == Boolean.%s", op, expect ? "TRUE" :
-                  "FALSE");
-            }
-            if (negate) {
-              expr = String.format("!(%s)", expr);
-            }
-            pw.print(String.format("boolean %s = %s;\n", val, expr));
-          }
-          return val;
-        }
-      };
-      return new AbstractMap.SimpleImmutableEntry<>(op, trans);
-    }
-
-
-    // If any of the arguments are false, result is false;
-    // else if any arguments are null, result is null;
-    // else true.
-    private static final CallExprPrinter AND_EXPR = new CallExprPrinter() {
-      @Override
-      public String translate(
-          ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName();
-        PrintWriter pw = compiler.pw;
-        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
-                               val));
-        RexNode op0 = call.getOperands().get(0);
-        RexNode op1 = call.getOperands().get(1);
-        boolean lhsNullable = op0.getType().isNullable();
-        boolean rhsNullable = op1.getType().isNullable();
-        String lhs = op0.accept(compiler);
-        if (!lhsNullable) {
-          pw.print(String.format("if (!(%2$s)) { %1$s = false; }\n", val, 
lhs));
-          pw.print("else {\n");
-          String rhs = op1.accept(compiler);
-          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
-        } else {
-          String foldedLHS = foldNullExpr(
-              String.format("%1$s == null || %1$s", lhs), "true", op0);
-          pw.print(String.format("if (%s) {\n", foldedLHS));
-          String rhs = op1.accept(compiler);
-          String s;
-          if (rhsNullable) {
-            s = foldNullExpr(
-                String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : 
((%1$s == null || %2$s == null) ? null : Boolean.TRUE)",
-                              lhs, rhs), "null", op1);
-          } else {
-            s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
-          }
-          pw.print(String.format("  %1$s = %2$s;\n", val, s));
-          pw.print(String.format("} else { %1$s = false; }\n", val));
-        }
-        return val;
-      }
-    };
-
-    // If any of the arguments are true, result is true;
-    // else if any arguments are null, result is null;
-    // else false.
-    private static final CallExprPrinter OR_EXPR = new CallExprPrinter() {
-      @Override
-      public String translate(
-          ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName();
-        PrintWriter pw = compiler.pw;
-        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
-                               val));
-        RexNode op0 = call.getOperands().get(0);
-        RexNode op1 = call.getOperands().get(1);
-        boolean lhsNullable = op0.getType().isNullable();
-        boolean rhsNullable = op1.getType().isNullable();
-        String lhs = op0.accept(compiler);
-        if (!lhsNullable) {
-          pw.print(String.format("if (%2$s) { %1$s = true; }\n", val, lhs));
-          pw.print("else {\n");
-          String rhs = op1.accept(compiler);
-          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
-        } else {
-          String foldedLHS = foldNullExpr(
-              String.format("%1$s == null || !(%1$s)", lhs), "true", op0);
-          pw.print(String.format("if (%s) {\n", foldedLHS));
-          String rhs = op1.accept(compiler);
-          String s;
-          if (rhsNullable) {
-            s = foldNullExpr(
-                String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s 
== null || %2$s == null) ? null : Boolean.FALSE)",
-                              lhs, rhs),
-                "null", op1);
-          } else {
-            s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
-          }
-          pw.print(String.format("  %1$s = %2$s;\n", val, s));
-          pw.print(String.format("} else { %1$s = true; }\n", val));
-        }
-        return val;
-      }
-    };
-
-    private static final CallExprPrinter NOT_EXPR = new CallExprPrinter() {
-      @Override
-      public String translate(
-          ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName();
-        PrintWriter pw = compiler.pw;
-        RexNode op = call.getOperands().get(0);
-        String lhs = op.accept(compiler);
-        boolean nullable = call.getType().isNullable();
-        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
-                               val));
-        if (!nullable) {
-          pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
-        } else {
-          String s = foldNullExpr(
-              String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op);
-          pw.print(String.format("%1$s = %2$s;\n", val, s));
-        }
-        return val;
-      }
-    };
-
-
-    private static final CallExprPrinter CAST_EXPR = new CallExprPrinter() {
-      @Override
-      public String translate(
-              ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName();
-        PrintWriter pw = compiler.pw;
-        RexNode op = call.getOperands().get(0);
-        String lhs = op.accept(compiler);
-        pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n",
-                               compiler.boxedJavaTypeName(call), val, lhs));
-        return val;
-      }
-    };
-  }
-
-  private static String foldNullExpr(String notNullExpr, String
-      nullExpr, RexNode op) {
-    if (op instanceof RexLiteral && ((RexLiteral)op).getTypeName() == 
SqlTypeName.NULL) {
-      return nullExpr;
-    } else {
-      return notNullExpr;
-    }
-  }
-
-  public static String printMethodCall(Method method, List<String> args) {
-    return printMethodCall(method.getDeclaringClass(), method.getName(),
-        Modifier.isStatic(method.getModifiers()), args);
-  }
-
-  private static String printMethodCall(Class<?> clazz, String method, boolean 
isStatic, List<String> args) {
-    if (isStatic) {
-      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, 
Joiner.on(',').join(args));
-    } else {
-      return String.format("%s.%s(%s)", args.get(0), method,
-          Joiner.on(',').join(args.subList(1, args.size())));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
new file mode 100644
index 0000000..b9be471
--- /dev/null
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to 
expression ({@link org.apache.calcite.linq4j.tree.Expression}).
+ *
+ * This code is inspired by JaninoRexCompiler in Calcite, but while it is 
returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
+ * we need to pass the source code to EvaluationFilter or EvaluationFunction 
so that they can be serialized, and
+ * compiled and executed on worker.
+ */
+public class RexNodeToBlockStatementCompiler {
+  private final RexBuilder rexBuilder;
+
+  public RexNodeToBlockStatementCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  public BlockStatement compile(List<RexNode> nodes, RelDataType inputRowType) 
{
+    final RexProgramBuilder programBuilder =
+        new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+    final RexProgram program = programBuilder.getProgram();
+
+    final BlockBuilder builder = new BlockBuilder();
+    final ParameterExpression context_ =
+            Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+        Expressions.parameter(Object[].class, "outputValues");
+    final JavaTypeFactoryImpl javaTypeFactory =
+        new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    final RexToLixTranslator.InputGetter inputGetter =
+            new RexToLixTranslator.InputGetterImpl(
+                    ImmutableList.of(
+                            Pair.<Expression, PhysType>of(
+                                    Expressions.field(context_,
+                                            
BuiltInMethod.CONTEXT_VALUES.field),
+                                    PhysTypeImpl.of(javaTypeFactory, 
inputRowType,
+                                            JavaRowFormat.ARRAY, false))));
+    final Function1<String, RexToLixTranslator.InputGetter> correlates =
+            new Function1<String, RexToLixTranslator.InputGetter>() {
+              public RexToLixTranslator.InputGetter apply(String a0) {
+                throw new UnsupportedOperationException();
+              }
+            };
+    final Expression root =
+            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+    final List<Expression> list =
+            RexToLixTranslator.translateProjects(program, javaTypeFactory, 
builder,
+                    null, root, inputGetter, correlates);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(
+              Expressions.statement(
+                      Expressions.assign(
+                              Expressions.arrayIndex(outputValues_,
+                                      Expressions.constant(i)),
+                              list.get(i))));
+    }
+
+    return builder.toBlock();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
index 4c69da1..01546ed 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -24,16 +24,17 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.storm.sql.compiler.CompilerUtil;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.net.URLClassLoader;
-import java.util.ArrayDeque;
 import java.util.HashSet;
-import java.util.Queue;
 import java.util.Set;
 
 public class PlanCompiler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PlanCompiler.class);
+
   private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
   private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
   private static final String PROLOGUE = NEW_LINE_JOINER.join(
@@ -50,7 +51,13 @@ public class PlanCompiler {
       "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
       "import com.google.common.collect.ArrayListMultimap;",
       "import com.google.common.collect.Multimap;",
-      "public final class Processor extends AbstractValuesProcessor {", "");
+      "import org.apache.calcite.interpreter.Context;",
+      "import org.apache.calcite.interpreter.StormContext;",
+      "import org.apache.calcite.DataContext;",
+      "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
+      "public final class Processor extends AbstractValuesProcessor {",
+      "  public final static DataContext dataContext = new 
StormDataContext();",
+      "");
   private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
       "  @Override",
       "  public void initialize(Map<String, DataSource> data,",
@@ -113,6 +120,7 @@ public class PlanCompiler {
 
   public AbstractValuesProcessor compile(RelNode plan) throws Exception {
     String javaCode = generateJavaSource(plan);
+    LOG.debug("Compiling... source code {}", javaCode);
     ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
                                               PACKAGE_NAME + ".Processor",
                                               javaCode, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 5c674ad..98409c1 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler.backends.standalone;
 import com.google.common.base.Joiner;
 import com.google.common.primitives.Primitives;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.BlockStatement;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
@@ -30,18 +31,23 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.AggregateFunction;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.ExprCompiler;
 import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler;
+import org.apache.storm.tuple.Values;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +60,8 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
 
   private final PrintWriter pw;
   private final JavaTypeFactory typeFactory;
+  private final RexNodeToBlockStatementCompiler rexCompiler;
+
   private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
     "  private static final ChannelHandler %1$s = ",
     "    new AbstractChannelHandler() {",
@@ -196,6 +204,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> 
{
   RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
     this.typeFactory = typeFactory;
+    this.rexCompiler = new RexNodeToBlockStatementCompiler(new 
RexBuilder(typeFactory));
   }
 
   @Override
@@ -207,8 +216,18 @@ class RelNodeCompiler extends 
PostOrderRelNodeVisitor<Void> {
   @Override
   public Void visitFilter(Filter filter, List<Void> inputStreams) throws 
Exception {
     beginStage(filter);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-    String r = filter.getCondition().accept(compiler);
+
+    List<RexNode> childExps = filter.getChildExps();
+    RelDataType inputRowType = filter.getInput(0).getRowType();
+
+    pw.print("Context context = new StormContext(Processor.dataContext);\n");
+    pw.print("context.values = _data.toArray();\n");
+    pw.print("Object[] outputValues = new Object[1];\n");
+
+    BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType);
+    pw.write(codeBlock.toString());
+
+    String r = "((Boolean) outputValues[0])";
     if (filter.getCondition().getType().isNullable()) {
       pw.print(String.format("    if (%s != null && %s) { ctx.emit(_data); 
}\n", r, r));
     } else {
@@ -221,15 +240,19 @@ class RelNodeCompiler extends 
PostOrderRelNodeVisitor<Void> {
   @Override
   public Void visitProject(Project project, List<Void> inputStreams) throws 
Exception {
     beginStage(project);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-    int size = project.getChildExps().size();
-    String[] res = new String[size];
-    for (int i = 0; i < size; ++i) {
-      res[i] = project.getChildExps().get(i).accept(compiler);
-    }
 
-    pw.print(String.format("    ctx.emit(new Values(%s));\n",
-                           Joiner.on(',').join(res)));
+    List<RexNode> childExps = project.getChildExps();
+    RelDataType inputRowType = project.getInput(0).getRowType();
+    int outputCount = project.getRowType().getFieldCount();
+
+    pw.print("Context context = new StormContext(Processor.dataContext);\n");
+    pw.print("context.values = _data.toArray();\n");
+    pw.print(String.format("Object[] outputValues = new Object[%d];\n", 
outputCount));
+
+    BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType);
+    pw.write(codeBlock.toString());
+
+    pw.print("    ctx.emit(new Values(outputValues));\n");
     endStage();
     return null;
   }
@@ -332,7 +355,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> 
{
     }
     args.add(String.format("(%s)accumulators.get(\"%s\")", 
accumulatorType.getCanonicalName(), varName));
     pw.println(String.format("          final %s %s = %s;", 
resultType.getCanonicalName(),
-                             resultName, 
ExprCompiler.printMethodCall(aggFn.resultMethod, args)));
+                             resultName, printMethodCall(aggFn.resultMethod, 
args)));
 
     return resultName;
   }
@@ -386,7 +409,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> 
{
     }
     args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
                            "accumulators.get(\"" + varName + "\")",
-                           ExprCompiler.printMethodCall(aggFn.initMethod, 
args),
+                           printMethodCall(aggFn.initMethod, args),
                            accumulatorType.getCanonicalName()));
     if (argList.isEmpty()) {
       args.add("EMPTY_VALUES");
@@ -397,7 +420,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> 
{
     }
     pw.print(String.format("          accumulators.put(\"%s\", %s);\n",
                            varName,
-                           ExprCompiler.printMethodCall(aggFn.addMethod, 
args)));
+                           printMethodCall(aggFn.addMethod, args)));
   }
 
   private String reserveAggVarName(AggregateCall call) {
@@ -449,4 +472,19 @@ class RelNodeCompiler extends 
PostOrderRelNodeVisitor<Void> {
     }
     return res.toString();
   }
+
+  public static String printMethodCall(Method method, List<String> args) {
+    return printMethodCall(method.getDeclaringClass(), method.getName(),
+            Modifier.isStatic(method.getModifiers()), args);
+  }
+
+  private static String printMethodCall(Class<?> clazz, String method, boolean 
isStatic, List<String> args) {
+    if (isStatic) {
+      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, 
Joiner.on(',').join(args));
+    } else {
+      return String.format("%s.%s(%s)", args.get(0), method,
+              Joiner.on(',').join(args.subList(1, args.size())));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
index 41777e5..ad98d10 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.storm.sql.compiler.backends.trident;
 
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelNode;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
@@ -31,16 +32,18 @@ import java.util.Map;
 public class PlanCompiler {
     private Map<String, ISqlTridentDataSource> sources;
     private final JavaTypeFactory typeFactory;
+    private final DataContext dataContext;
 
-    public PlanCompiler(Map<String, ISqlTridentDataSource> sources, 
JavaTypeFactory typeFactory) {
+    public PlanCompiler(Map<String, ISqlTridentDataSource> sources, 
JavaTypeFactory typeFactory, DataContext dataContext) {
         this.sources = sources;
         this.typeFactory = typeFactory;
+        this.dataContext = dataContext;
     }
 
     public AbstractTridentProcessor compileForTest(RelNode plan) throws 
Exception {
         final TridentTopology topology = new TridentTopology();
 
-        TridentLogicalPlanCompiler compiler = new 
TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        TridentLogicalPlanCompiler compiler = new 
TridentLogicalPlanCompiler(sources, typeFactory, topology, dataContext);
         final IAggregatableStream stream = compiler.traverse(plan);
 
         return new AbstractTridentProcessor() {
@@ -59,7 +62,7 @@ public class PlanCompiler {
     public TridentTopology compile(RelNode plan) throws Exception {
         TridentTopology topology = new TridentTopology();
 
-        TridentLogicalPlanCompiler compiler = new 
TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        TridentLogicalPlanCompiler compiler = new 
TridentLogicalPlanCompiler(sources, typeFactory, topology, dataContext);
         compiler.traverse(plan);
 
         return topology;

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
index a0bfa21..8b08121 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
@@ -21,7 +21,9 @@ package org.apache.storm.sql.compiler.backends.trident;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Primitives;
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.BlockStatement;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
@@ -32,11 +34,15 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.calcite.util.Pair;
 import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
 import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
@@ -92,12 +98,17 @@ class TransformInformation {
 public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggregatableStream> {
     protected final Map<String, ISqlTridentDataSource> sources;
     protected final JavaTypeFactory typeFactory;
+    protected final RexNodeToBlockStatementCompiler rexCompiler;
+    private final DataContext dataContext;
     protected TridentTopology topology;
 
-    public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> 
sources, JavaTypeFactory typeFactory, TridentTopology topology) {
+    public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> 
sources, JavaTypeFactory typeFactory,
+                                      TridentTopology topology, DataContext 
dataContext) {
         this.sources = sources;
         this.typeFactory = typeFactory;
         this.topology = topology;
+        this.rexCompiler = new RexNodeToBlockStatementCompiler(new 
RexBuilder(typeFactory));
+        this.dataContext = dataContext;
     }
 
     @Override
@@ -202,26 +213,24 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
 
         // Trident doesn't allow duplicated field name... need to do the 
trick...
         List<String> outputFieldNames = project.getRowType().getFieldNames();
+        int outputCount = outputFieldNames.size();
         List<String> temporaryOutputFieldNames = new ArrayList<>();
         for (String outputFieldName : outputFieldNames) {
             temporaryOutputFieldNames.add("__" + outputFieldName + "__");
         }
 
-        try (StringWriter sw = new StringWriter(); PrintWriter pw = new 
PrintWriter(sw)) {
-            pw.write("import org.apache.storm.tuple.Values;\n");
-
-            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+        try (StringWriter sw = new StringWriter()) {
+            List<RexNode> childExps = project.getChildExps();
+            RelDataType inputRowType = project.getInput(0).getRowType();
+            BlockStatement codeBlock = rexCompiler.compile(childExps, 
inputRowType);
+            sw.write(codeBlock.toString());
 
-            int size = project.getChildExps().size();
-            String[] res = new String[size];
-            for (int i = 0; i < size; ++i) {
-                res[i] = project.getChildExps().get(i).accept(compiler);
-            }
+            // we use out parameter and don't use the return value but compile 
fails...
+            sw.write("return 0;");
 
-            pw.write(String.format("\nreturn new Values(%s);", 
Joiner.on(',').join(res)));
             final String expression = sw.toString();
 
-            return inputStream.each(inputFields, new 
EvaluationFunction(expression), new Fields(temporaryOutputFieldNames))
+            return inputStream.each(inputFields, new 
EvaluationFunction(expression, outputCount, dataContext), new 
Fields(temporaryOutputFieldNames))
                     .project(new Fields(temporaryOutputFieldNames))
                     .each(new Fields(temporaryOutputFieldNames), new 
ForwardFunction(), new Fields(outputFieldNames))
                     .project(new Fields(outputFieldNames))
@@ -239,12 +248,17 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
         String stageName = getStageName(filter);
 
         try (StringWriter sw = new StringWriter(); PrintWriter pw = new 
PrintWriter(sw)) {
-            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-            String ret = filter.getCondition().accept(compiler);
-            pw.write(String.format("\nreturn %s;", ret));
+            List<RexNode> childExps = filter.getChildExps();
+            RelDataType inputRowType = filter.getInput(0).getRowType();
+            BlockStatement codeBlock = rexCompiler.compile(childExps, 
inputRowType);
+            sw.write(codeBlock.toString());
+
+            // we use out parameter and don't use the return value but compile 
fails...
+            sw.write("return 0;");
+
             final String expression = sw.toString();
 
-            return inputStream.filter(new 
EvaluationFilter(expression)).name(stageName);
+            return inputStream.filter(new EvaluationFilter(expression, 
dataContext)).name(stageName);
         }
     }
 
@@ -257,6 +271,8 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
         Stream inputStream = inputStreams.get(0).toStream();
         String stageName = getStageName(aggregate);
 
+        List<String> outputFieldNames = aggregate.getRowType().getFieldNames();
+
         List<String> groupByFieldNames = new ArrayList<>();
         for (Integer idx : aggregate.getGroupSet()) {
             String fieldName = inputStream.getOutputFields().get(idx);
@@ -268,8 +284,17 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
 
         ChainedAggregatorDeclarer chainedAggregatorDeclarer = 
groupedStream.chainedAgg();
         List<TransformInformation> transformsAfterChained = new ArrayList<>();
-        for (AggregateCall call : aggregate.getAggCallList()) {
-            appendAggregationInChain(chainedAggregatorDeclarer, 
groupByFieldNames, inputStream, call, transformsAfterChained);
+
+        // Trident doesn't allow duplicated field name... need to do the 
trick...
+        List<String> temporaryOutputFieldNames = new ArrayList<>();
+        List<String> originOutputFieldNames = new ArrayList<>();
+        for (Pair<AggregateCall, String> aggCallToOutFieldName : 
aggregate.getNamedAggCalls()) {
+            AggregateCall call = aggCallToOutFieldName.getKey();
+            String outFileName = aggCallToOutFieldName.getValue();
+            String tempOutFileName = "__" + outFileName + "__";
+            temporaryOutputFieldNames.add(tempOutFileName);
+            originOutputFieldNames.add(outFileName);
+            appendAggregationInChain(chainedAggregatorDeclarer, 
groupByFieldNames, tempOutFileName, inputStream, call, transformsAfterChained);
         }
 
         Stream stream = chainedAggregatorDeclarer.chainEnd();
@@ -277,14 +302,13 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
             stream = stream.each(transformInformation.getInputFields(), 
transformInformation.getFunction(), transformInformation.getFunctionFields());
         }
 
-        // We're OK to project by Calcite information since each aggregation 
function create output fields via that information
-        List<String> outputFields = aggregate.getRowType().getFieldNames();
-        return stream.project(new Fields(outputFields)).name(stageName);
+        return stream
+                .each(new Fields(temporaryOutputFieldNames), new 
ForwardFunction(), new Fields(originOutputFieldNames))
+                .project(new Fields(outputFieldNames)).name(stageName);
     }
 
     private void appendAggregationInChain(ChainedAggregatorDeclarer 
chainedDeclarer, List<String> groupByFieldNames,
-                                          Stream inputStream, AggregateCall 
call, List<TransformInformation> transformsAfterChained) {
-        String outputField = call.getName();
+                                          String outputFieldName, Stream 
inputStream, AggregateCall call, List<TransformInformation> 
transformsAfterChained) {
         SqlAggFunction aggFunction = call.getAggregation();
         String aggregationName = call.getAggregation().getName();
 
@@ -300,27 +324,27 @@ public class TridentLogicalPlanCompiler extends 
PostOrderRelNodeVisitor<IAggrega
         }
 
         if (aggFunction instanceof SqlUserDefinedAggFunction) {
-            appendUDAFToChain(chainedDeclarer, groupByFieldNames, inputStream, 
call, outputField);
+            appendUDAFToChain(chainedDeclarer, groupByFieldNames, inputStream, 
call, outputFieldName);
         } else {
             switch (aggregationName.toUpperCase()) {
                 case "COUNT":
-                    appendCountFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputField);
+                    appendCountFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputFieldName);
                     break;
 
                 case "MAX":
-                    appendMaxFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputField);
+                    appendMaxFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputFieldName);
                     break;
 
                 case "MIN":
-                    appendMinFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputField);
+                    appendMinFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputFieldName);
                     break;
 
                 case "SUM":
-                    appendSumFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputField);
+                    appendSumFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputFieldName);
                     break;
 
                 case "AVG":
-                    appendAvgFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputField, transformsAfterChained);
+                    appendAvgFunctionToChain(chainedDeclarer, 
groupByFieldNames, inputStream, call, outputFieldName, transformsAfterChained);
                     break;
 
                 default:

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 025219b..b0cffdd 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -18,9 +18,8 @@
 package org.apache.storm.sql;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.tools.ValidationException;
-import 
org.apache.storm.sql.compiler.backends.standalone.BuiltinAggregateFunctions;
+import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
@@ -76,7 +75,7 @@ public class TestStormSql {
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   String properties, 
List<FieldInfo> fields) {
-      throw new UnsupportedOperationException("Not supported");
+      return new TestUtils.MockSqlTridentDataSource();
     }
   }
 
@@ -96,7 +95,7 @@ public class TestStormSql {
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   String properties, 
List<FieldInfo> fields) {
-      throw new UnsupportedOperationException("Not supported");
+      return new TestUtils.MockSqlTridentGroupedDataSource();
     }
   }
 
@@ -116,7 +115,7 @@ public class TestStormSql {
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   String properties, 
List<FieldInfo> fields) {
-      throw new UnsupportedOperationException("Not supported");
+      return new TestUtils.MockSqlTridentJoinDataSourceEmp();
     }
   }
 
@@ -136,7 +135,7 @@ public class TestStormSql {
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   String properties, 
List<FieldInfo> fields) {
-      throw new UnsupportedOperationException("Not supported");
+      return new TestUtils.MockSqlTridentJoinDataSourceDept();
     }
   }
 
@@ -174,9 +173,9 @@ public class TestStormSql {
   public void testExternalDataSourceNested() throws Exception {
     List<String> stmt = new ArrayList<>();
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+    stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
                      "FROM FOO " +
-                     "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 
200");
+                     "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND 
CAST(ARRAYFIELD[2] AS INTEGER) = 200");
     StormSql sql = StormSql.construct();
     List<Values> values = new ArrayList<>();
     ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
@@ -184,16 +183,60 @@ public class TestStormSql {
     System.out.println(values);
     Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
     Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
+    Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
   }
 
   @Test
-  public void testExternalNestedInvalidAccess() throws Exception {
+  public void testExternalNestedNonExistKeyAccess() throws Exception {
     List<String> stmt = new ArrayList<>();
+    // this triggers java.lang.RuntimeException: Cannot convert null to int
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
     stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-                     "FROM FOO " +
-                     "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD['a'] = 
200");
+             "FROM FOO " +
+             "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(0, values.size());
+  }
+
+  @Test
+  public void testExternalNestedNonExistKeyAccess2() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    // this triggers java.lang.RuntimeException: Cannot convert null to int
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+             "FROM FOO " +
+             "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(0, values.size());
+  }
+
+  @Test
+  public void testExternalNestedInvalidAccessStringIndexOnArray() throws 
Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+             "FROM FOO " +
+             "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(0, values.size());
+  }
+
+  @Test
+  public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD 
ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+             "FROM FOO " +
+             "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
     StormSql sql = StormSql.construct();
     List<Values> values = new ArrayList<>();
     ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
@@ -215,9 +258,10 @@ public class TestStormSql {
 
   }
 
-  @Test
+  @Test(expected = CompilingClassLoader.CompilerException.class)
   public void testExternalUdfType2() throws Exception {
     List<String> stmt = new ArrayList<>();
+    // generated code will be not compilable since return type of MYPLUS and 
type of 'x' are different
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
     stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
     stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index c3b9ad3..8ea0036 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -18,23 +18,28 @@
 package org.apache.storm.sql.compiler;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.SqlType;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
@@ -150,9 +155,32 @@ public class TestCompilerUtils {
 
         StreamableTable streamableTable = new 
CompilerUtil.TableBuilderInfo(typeFactory)
                 .field("ID", SqlTypeName.INTEGER)
-                .field("MAPFIELD", SqlTypeName.ANY)
-                .field("NESTEDMAPFIELD", SqlTypeName.ANY)
-                .field("ARRAYFIELD", SqlTypeName.ANY)
+                .field("MAPFIELD",
+                        typeFactory.createTypeWithNullability(
+                                typeFactory.createMapType(
+                                        typeFactory.createTypeWithNullability(
+                                                
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                                        typeFactory.createTypeWithNullability(
+                                                
typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+                                , true))
+                .field("NESTEDMAPFIELD",
+                        typeFactory.createTypeWithNullability(
+                            typeFactory.createMapType(
+                                    typeFactory.createTypeWithNullability(
+                                            
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                                    typeFactory.createTypeWithNullability(
+                                            typeFactory.createMapType(
+                                                    
typeFactory.createTypeWithNullability(
+                                                            
typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                                                    
typeFactory.createTypeWithNullability(
+                                                            
typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+                                            , true))
+                                        , true))
+                .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
+                        typeFactory.createArrayType(
+                            typeFactory.createTypeWithNullability(
+                                
typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
+                        , true))
                 .build();
         Table table = streamableTable.stream();
         schema.add("FOO", table);

http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
deleted file mode 100644
index ae95fcc..0000000
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-public class TestExprCompiler {
-  @Test
-  public void testLiteral() throws Exception {
-    String sql = "SELECT 1,1.0,TRUE,'FOO' FROM FOO";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    JavaTypeFactory typeFactory = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-    LogicalProject project = (LogicalProject) state.tree;
-    String[] res = new String[project.getChildExps().size()];
-    for (int i = 0; i < project.getChildExps().size(); ++i) {
-      StringWriter sw = new StringWriter();
-      try (PrintWriter pw = new PrintWriter(sw)) {
-        ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-        res[i] = project.getChildExps().get(i).accept(compiler);
-      }
-    }
-
-    assertArrayEquals(new String[] {"1", "1.0E0", "true", "\"FOO\""}, res);
-  }
-
-  @Test
-  public void testInputRef() throws Exception {
-    String sql = "SELECT ID FROM FOO";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    JavaTypeFactory typeFactory = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-    LogicalProject project = (LogicalProject) state.tree;
-    StringWriter sw = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(sw)) {
-      ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-      project.getChildExps().get(0).accept(compiler);
-    }
-
-    assertThat(sw.toString(), 
containsString("(java.lang.Integer)(_data.get(0));"));
-  }
-
-  @Test
-  public void testCallExpr() throws Exception {
-    String sql = "SELECT 1>2, 3+5, 1-1.0, 3+ID FROM FOO";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
-    JavaTypeFactory typeFactory = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-    LogicalProject project = (LogicalProject) state.tree;
-    String[] res = new String[project.getChildExps().size()];
-    List<StringWriter> sw = new ArrayList<>();
-    for (int i = 0; i < project.getChildExps().size(); ++i) {
-      sw.add(new StringWriter());
-    }
-
-    for (int i = 0; i < project.getChildExps().size(); ++i) {
-      try (PrintWriter pw = new PrintWriter(sw.get(i))) {
-        ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-        res[i] = project.getChildExps().get(i).accept(compiler);
-      }
-    }
-    assertThat(sw.get(0).toString(), containsString("1 > 2"));
-    assertThat(sw.get(1).toString(), containsString("plus(3,5)"));
-    assertThat(sw.get(2).toString(), containsString("minus(1,1.0E0)"));
-    assertThat(sw.get(3).toString(), containsString("plus(3,"));
-  }
-}

Reply via email to