STORM-2125 Use Calcite's implementation of Rex Compiler * Upgrade Calcite to latest 1.10.0 * address CALCITE-1386 to keep supporting nested map / array * try best to keep runtime-safety with nested map / array type * when given detailed type information, it supports full features of nested map / array * borrow JaninoRexCompiler and modify to return Expression instead of Scalar (RexNodeToBlockStatementCompiler) * since we need to pass code block to Bolts, not executable method implementation on classloader * modify codebase to use RexNodeToBlockStatementCompiler * remove ExprCompiler and its test * add tests for all of scalar functions Calcite supports * we don't use Calcite aggregate functions and window functions so no need to add them now * add a simple subquery unit test * need to dig and add more case of subqueries later * Fix aggregate fields issue with Trident behavior * Fix DATE/TIME unit tests with timezone issue * address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dec76a7e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dec76a7e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dec76a7e Branch: refs/heads/1.x-branch Commit: dec76a7e957d8f2f2c283becf6bc1ef53a191e1b Parents: cdcfaef Author: Jungtaek Lim <kabh...@gmail.com> Authored: Thu Sep 22 15:38:44 2016 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Wed Oct 12 18:26:29 2016 +0900 ---------------------------------------------------------------------- external/sql/storm-sql-core/pom.xml | 25 - .../storm-sql-core/src/codegen/data/Parser.tdd | 6 +- .../jvm/org/apache/storm/sql/StormSqlImpl.java | 9 +- .../apache/storm/sql/compiler/ExprCompiler.java | 523 ------------------- .../RexNodeToBlockStatementCompiler.java | 102 ++++ .../backends/standalone/PlanCompiler.java | 16 +- .../backends/standalone/RelNodeCompiler.java | 68 ++- .../compiler/backends/trident/PlanCompiler.java | 9 +- .../trident/TridentLogicalPlanCompiler.java | 84 +-- .../test/org/apache/storm/sql/TestStormSql.java | 70 ++- .../storm/sql/compiler/TestCompilerUtils.java | 34 +- .../storm/sql/compiler/TestExprCompiler.java | 93 ---- .../storm/sql/compiler/TestExprSemantic.java | 262 +++++++++- .../backends/standalone/TestPlanCompiler.java | 6 +- .../standalone/TestRelNodeCompiler.java | 2 +- .../backends/trident/TestPlanCompiler.java | 143 ++++- external/sql/storm-sql-runtime/pom.xml | 13 - .../calcite/interpreter/StormContext.java | 31 ++ .../sql/runtime/calcite/StormDataContext.java | 79 +++ .../trident/functions/EvaluationFilter.java | 21 +- .../trident/functions/EvaluationFunction.java | 22 +- .../test/org/apache/storm/sql/TestUtils.java | 91 +++- pom.xml | 11 +- 23 files changed, 923 insertions(+), 797 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/pom.xml ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml index f2e29ca..b8790a4 100644 --- a/external/sql/storm-sql-core/pom.xml +++ b/external/sql/storm-sql-core/pom.xml @@ -61,36 +61,11 @@ <version>${calcite.version}</version> <exclusions> <exclusion> - <groupId>commons-dbcp</groupId> - <artifactId>commons-dbcp</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.janino</groupId> - <artifactId>commons-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.pentaho</groupId> - <artifactId>pentaho-aggdesigner-algorithm</artifactId> - </exclusion> - <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </exclusion> </exclusions> </dependency> - <!-- janino --> - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/codegen/data/Parser.tdd ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd index 1e921c7..22a60f6 100644 --- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd +++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd @@ -52,6 +52,9 @@ dataTypeParserMethods: [ ] + nonReservedKeywords: [ + ] + # List of files in @includes directory that have parser method # implementations for custom SQL statements, literals or types # given as part of "statementParserMethods", "literalParserMethods" or @@ -62,5 +65,6 @@ includeCompoundIdentifier: true, includeBraces: true, - includeAdditionalDeclarations: false + includeAdditionalDeclarations: false, + allowBangEqual: false } http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index 61f3b6f..368bbea 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -17,6 +17,8 @@ */ package org.apache.storm.sql; +import org.apache.calcite.DataContext; +import org.apache.storm.sql.runtime.calcite.StormDataContext; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.prepare.CalciteCatalogReader; @@ -41,18 +43,15 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Planner; import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler; -import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.parser.ColumnConstraint; import org.apache.storm.sql.parser.ColumnDefinition; import org.apache.storm.sql.parser.SqlCreateFunction; import org.apache.storm.sql.parser.SqlCreateTable; import org.apache.storm.sql.parser.StormParser; import org.apache.storm.sql.runtime.*; -import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor; import org.apache.storm.trident.TridentTopology; import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Method; @@ -66,7 +65,6 @@ import java.util.Map; import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; -import java.util.zip.ZipEntry; import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo; @@ -115,13 +113,14 @@ class StormSqlImpl extends StormSql { } else if (node instanceof SqlCreateFunction) { handleCreateFunction((SqlCreateFunction) node); } else { + DataContext dataContext = new StormDataContext(); FrameworkConfig config = buildFrameWorkConfig(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); RelNode tree = planner.convert(validate); org.apache.storm.sql.compiler.backends.trident.PlanCompiler compiler = - new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, typeFactory); + new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, typeFactory, dataContext); TridentTopology topo = compiler.compile(tree); Path jarPath = null; try { http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java deleted file mode 100644 index 4e1c127..0000000 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java +++ /dev/null @@ -1,523 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.primitives.Primitives; -import org.apache.calcite.adapter.enumerable.NullPolicy; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.linq4j.tree.Primitive; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.*; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.schema.Function; -import org.apache.calcite.schema.impl.ReflectiveFunctionBase; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlUserDefinedFunction; -import org.apache.calcite.util.BuiltInMethod; -import org.apache.calcite.util.NlsString; -import org.apache.calcite.util.Util; -import org.apache.storm.sql.runtime.StormSqlFunctions; - -import java.io.PrintWriter; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.Type; -import java.math.BigDecimal; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*; - -/** - * Compile RexNode on top of the Tuple abstraction. - */ -public class ExprCompiler implements RexVisitor<String> { - private final PrintWriter pw; - private final JavaTypeFactory typeFactory; - private static final ImpTable IMP_TABLE = new ImpTable(); - private int nameCount; - - public ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) { - this.pw = pw; - this.typeFactory = typeFactory; - } - - @Override - public String visitInputRef(RexInputRef rexInputRef) { - String name = reserveName(); - String typeName = javaTypeName(rexInputRef); - String boxedTypeName = boxedJavaTypeName(rexInputRef); - pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name, - boxedTypeName, rexInputRef.getIndex())); - return name; - } - - @Override - public String visitLocalRef(RexLocalRef rexLocalRef) { - throw new UnsupportedOperationException(); - } - - @Override - public String visitLiteral(RexLiteral rexLiteral) { - Object v = rexLiteral.getValue(); - RelDataType ty = rexLiteral.getType(); - switch(rexLiteral.getTypeName()) { - case BOOLEAN: - return v.toString(); - case CHAR: - return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true); - case NULL: - return "((" + ((Class<?>)typeFactory.getJavaClass(ty)).getCanonicalName() + ")null)"; - case DOUBLE: - case BIGINT: - case DECIMAL: - switch (ty.getSqlTypeName()) { - case TINYINT: - case SMALLINT: - case INTEGER: - return Long.toString(((BigDecimal) v).longValueExact()); - case BIGINT: - return Long.toString(((BigDecimal)v).longValueExact()) + 'L'; - case DECIMAL: - case FLOAT: - case REAL: - case DOUBLE: - return Util.toScientificNotation((BigDecimal) v); - } - break; - default: - throw new UnsupportedOperationException(); - } - return null; - } - - @Override - public String visitCall(RexCall rexCall) { - return IMP_TABLE.compile(this, rexCall); - } - - @Override - public String visitOver(RexOver rexOver) { - throw new UnsupportedOperationException(); - } - - @Override - public String visitCorrelVariable( - RexCorrelVariable rexCorrelVariable) { - throw new UnsupportedOperationException(); - } - - @Override - public String visitDynamicParam( - RexDynamicParam rexDynamicParam) { - throw new UnsupportedOperationException(); - } - - @Override - public String visitRangeRef(RexRangeRef rexRangeRef) { - throw new UnsupportedOperationException(); - } - - @Override - public String visitFieldAccess( - RexFieldAccess rexFieldAccess) { - throw new UnsupportedOperationException(); - } - - private String javaTypeName(RexNode node) { - Type ty = typeFactory.getJavaClass(node.getType()); - return ((Class<?>)ty).getCanonicalName(); - } - - private String boxedJavaTypeName(RexNode node) { - Type ty = typeFactory.getJavaClass(node.getType()); - Class clazz = (Class<?>)ty; - if (clazz.isPrimitive()) { - clazz = Primitives.wrap(clazz); - } - - return clazz.getCanonicalName(); - } - - private String reserveName() { - return "t" + ++nameCount; - } - - // Only generate inline expressions when comparing primitive types - private boolean primitiveCompareExpr(SqlOperator op, RelDataType type) { - final Primitive primitive = Primitive.ofBoxOr(typeFactory.getJavaClass(type)); - return primitive != null && - (op == LESS_THAN || op == LESS_THAN_OR_EQUAL || op == GREATER_THAN || op == GREATER_THAN_OR_EQUAL); - } - - private interface CallExprPrinter { - String translate(ExprCompiler compiler, RexCall call); - } - - /** - * Inspired by Calcite's RexImpTable, the ImpTable class maps the operators - * to their corresponding implementation that generates the expressions in - * the format of Java source code. - */ - private static class ImpTable { - private final Map<SqlOperator, CallExprPrinter> translators; - - private ImpTable() { - ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder = - ImmutableMap.builder(); - builder - .put(builtInMethod(UPPER, BuiltInMethod.UPPER, NullPolicy.STRICT)) - .put(builtInMethod(LOWER, BuiltInMethod.LOWER, NullPolicy.STRICT)) - .put(builtInMethod(INITCAP, BuiltInMethod.INITCAP, NullPolicy.STRICT)) - .put(builtInMethod(SUBSTRING, BuiltInMethod.SUBSTRING, NullPolicy.STRICT)) - .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT)) - .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT)) - .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT)) - .put(builtInMethod(ITEM, BuiltInMethod.ANY_ITEM, NullPolicy.STRICT)) - .put(infixBinary(LESS_THAN, "<", "lt")) - .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le")) - .put(infixBinary(GREATER_THAN, ">", "gt")) - .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=", "ge")) - .put(infixBinary(EQUALS, "==", StormSqlFunctions.class, "eq")) - .put(infixBinary(NOT_EQUALS, "<>", StormSqlFunctions.class, "ne")) - .put(infixBinary(PLUS, "+", "plus")) - .put(infixBinary(MINUS, "-", "minus")) - .put(infixBinary(MULTIPLY, "*", "multiply")) - .put(infixBinary(DIVIDE, "/", "divide")) - .put(infixBinary(DIVIDE_INTEGER, "/", "divide")) - .put(expect(IS_NULL, null)) - .put(expectNot(IS_NOT_NULL, null)) - .put(expect(IS_TRUE, true)) - .put(expectNot(IS_NOT_TRUE, true)) - .put(expect(IS_FALSE, false)) - .put(expectNot(IS_NOT_FALSE, false)) - .put(AND, AND_EXPR) - .put(OR, OR_EXPR) - .put(NOT, NOT_EXPR) - .put(CAST, CAST_EXPR); - this.translators = builder.build(); - } - - private CallExprPrinter getCallExprPrinter(SqlOperator op) { - if (translators.containsKey(op)) { - return translators.get(op); - } else if (op instanceof SqlUserDefinedFunction) { - Function function = ((SqlUserDefinedFunction) op).getFunction(); - if (function instanceof ReflectiveFunctionBase) { - Method method = ((ReflectiveFunctionBase) function).method; - return methodCall(op, method, NullPolicy.STRICT).getValue(); - } - } - return null; - } - - private String compile(ExprCompiler compiler, RexCall call) { - SqlOperator op = call.getOperator(); - CallExprPrinter printer = getCallExprPrinter(op); - if (printer == null) { - throw new UnsupportedOperationException(); - } else { - return printer.translate(compiler, call); - } - } - - private Map.Entry<SqlOperator, CallExprPrinter> methodCall( - final SqlOperator op, final Method method, NullPolicy nullPolicy) { - if (nullPolicy != NullPolicy.STRICT) { - throw new UnsupportedOperationException(); - } - CallExprPrinter printer = new CallExprPrinter() { - @Override - public String translate(ExprCompiler compiler, RexCall call) { - PrintWriter pw = compiler.pw; - String val = compiler.reserveName(); - pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val)); - List<String> args = new ArrayList<>(); - for (RexNode op : call.getOperands()) { - args.add(op.accept(compiler)); - } - pw.print("if (false) {}\n"); - for (int i = 0; i < args.size(); ++i) { - String arg = args.get(i); - if (call.getOperands().get(i).getType().isNullable()) { - pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg)); - } - } - String calc = printMethodCall(method, args); - pw.print(String.format("else { %1$s = %2$s; }\n", val, calc)); - return val; - } - }; - return new AbstractMap.SimpleImmutableEntry<>(op, printer); - } - - private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod( - final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) { - return methodCall(op, method.method, nullPolicy); - } - - private Map.Entry<SqlOperator, CallExprPrinter> infixBinary - (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) { - CallExprPrinter trans = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - int size = call.getOperands().size(); - assert size == 2; - String val = compiler.reserveName(); - RexNode op0 = call.getOperands().get(0); - RexNode op1 = call.getOperands().get(1); - PrintWriter pw = compiler.pw; - if (backupMethodName != null) { - if (!compiler.primitiveCompareExpr(op, op0.getType())) { - String lhs = op0.accept(compiler); - String rhs = op1.accept(compiler); - pw.print(String.format("%s %s = %s;\n", compiler.javaTypeName(call), val, - printMethodCall(clazz, backupMethodName, true, Lists.newArrayList(lhs, rhs)))); - return val; - } - } - boolean lhsNullable = op0.getType().isNullable(); - boolean rhsNullable = op1.getType().isNullable(); - - pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val)); - String lhs = op0.accept(compiler); - String rhs = op1.accept(compiler); - pw.print("if (false) {}\n"); - if (lhsNullable) { - String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op1); - pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, lhs, calc)); - } - if (rhsNullable) { - String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op0); - pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, rhs, calc)); - } - String calc = String.format("%s %s %s", lhs, javaOperator, rhs); - pw.print(String.format("else { %1$s = %2$s; }\n", val, calc)); - return val; - } - }; - return new AbstractMap.SimpleImmutableEntry<>(op, trans); - } - - private Map.Entry<SqlOperator, CallExprPrinter> infixBinary - (final SqlOperator op, final String javaOperator, final String backupMethodName) { - return infixBinary(op, javaOperator, SqlFunctions.class, backupMethodName); - } - - private Map.Entry<SqlOperator, CallExprPrinter> expect( - SqlOperator op, final Boolean expect) { - return expect0(op, expect, false); - } - - private Map.Entry<SqlOperator, CallExprPrinter> expectNot( - SqlOperator op, final Boolean expect) { - return expect0(op, expect, true); - } - - private Map.Entry<SqlOperator, CallExprPrinter> expect0( - SqlOperator op, final Boolean expect, final boolean negate) { - CallExprPrinter trans = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - assert call.getOperands().size() == 1; - String val = compiler.reserveName(); - RexNode operand = call.getOperands().get(0); - boolean nullable = operand.getType().isNullable(); - String op = operand.accept(compiler); - PrintWriter pw = compiler.pw; - if (!nullable) { - if (expect == null) { - pw.print(String.format("boolean %s = %b;\n", val, !negate)); - } else { - pw.print(String.format("boolean %s = %s == %b;\n", val, op, - expect ^ negate)); - } - } else { - String expr; - if (expect == null) { - expr = String.format("%s == null", op); - } else { - expr = String.format("%s == Boolean.%s", op, expect ? "TRUE" : - "FALSE"); - } - if (negate) { - expr = String.format("!(%s)", expr); - } - pw.print(String.format("boolean %s = %s;\n", val, expr)); - } - return val; - } - }; - return new AbstractMap.SimpleImmutableEntry<>(op, trans); - } - - - // If any of the arguments are false, result is false; - // else if any arguments are null, result is null; - // else true. - private static final CallExprPrinter AND_EXPR = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - String val = compiler.reserveName(); - PrintWriter pw = compiler.pw; - pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), - val)); - RexNode op0 = call.getOperands().get(0); - RexNode op1 = call.getOperands().get(1); - boolean lhsNullable = op0.getType().isNullable(); - boolean rhsNullable = op1.getType().isNullable(); - String lhs = op0.accept(compiler); - if (!lhsNullable) { - pw.print(String.format("if (!(%2$s)) { %1$s = false; }\n", val, lhs)); - pw.print("else {\n"); - String rhs = op1.accept(compiler); - pw.print(String.format(" %1$s = %2$s;\n}\n", val, rhs)); - } else { - String foldedLHS = foldNullExpr( - String.format("%1$s == null || %1$s", lhs), "true", op0); - pw.print(String.format("if (%s) {\n", foldedLHS)); - String rhs = op1.accept(compiler); - String s; - if (rhsNullable) { - s = foldNullExpr( - String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : ((%1$s == null || %2$s == null) ? null : Boolean.TRUE)", - lhs, rhs), "null", op1); - } else { - s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs); - } - pw.print(String.format(" %1$s = %2$s;\n", val, s)); - pw.print(String.format("} else { %1$s = false; }\n", val)); - } - return val; - } - }; - - // If any of the arguments are true, result is true; - // else if any arguments are null, result is null; - // else false. - private static final CallExprPrinter OR_EXPR = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - String val = compiler.reserveName(); - PrintWriter pw = compiler.pw; - pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), - val)); - RexNode op0 = call.getOperands().get(0); - RexNode op1 = call.getOperands().get(1); - boolean lhsNullable = op0.getType().isNullable(); - boolean rhsNullable = op1.getType().isNullable(); - String lhs = op0.accept(compiler); - if (!lhsNullable) { - pw.print(String.format("if (%2$s) { %1$s = true; }\n", val, lhs)); - pw.print("else {\n"); - String rhs = op1.accept(compiler); - pw.print(String.format(" %1$s = %2$s;\n}\n", val, rhs)); - } else { - String foldedLHS = foldNullExpr( - String.format("%1$s == null || !(%1$s)", lhs), "true", op0); - pw.print(String.format("if (%s) {\n", foldedLHS)); - String rhs = op1.accept(compiler); - String s; - if (rhsNullable) { - s = foldNullExpr( - String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s == null || %2$s == null) ? null : Boolean.FALSE)", - lhs, rhs), - "null", op1); - } else { - s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs); - } - pw.print(String.format(" %1$s = %2$s;\n", val, s)); - pw.print(String.format("} else { %1$s = true; }\n", val)); - } - return val; - } - }; - - private static final CallExprPrinter NOT_EXPR = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - String val = compiler.reserveName(); - PrintWriter pw = compiler.pw; - RexNode op = call.getOperands().get(0); - String lhs = op.accept(compiler); - boolean nullable = call.getType().isNullable(); - pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), - val)); - if (!nullable) { - pw.print(String.format("%1$s = !(%2$s);\n", val, lhs)); - } else { - String s = foldNullExpr( - String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op); - pw.print(String.format("%1$s = %2$s;\n", val, s)); - } - return val; - } - }; - - - private static final CallExprPrinter CAST_EXPR = new CallExprPrinter() { - @Override - public String translate( - ExprCompiler compiler, RexCall call) { - String val = compiler.reserveName(); - PrintWriter pw = compiler.pw; - RexNode op = call.getOperands().get(0); - String lhs = op.accept(compiler); - pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n", - compiler.boxedJavaTypeName(call), val, lhs)); - return val; - } - }; - } - - private static String foldNullExpr(String notNullExpr, String - nullExpr, RexNode op) { - if (op instanceof RexLiteral && ((RexLiteral)op).getTypeName() == SqlTypeName.NULL) { - return nullExpr; - } else { - return notNullExpr; - } - } - - public static String printMethodCall(Method method, List<String> args) { - return printMethodCall(method.getDeclaringClass(), method.getName(), - Modifier.isStatic(method.getModifiers()), args); - } - - private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) { - if (isStatic) { - return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args)); - } else { - return String.format("%s.%s(%s)", args.get(0), method, - Joiner.on(',').join(args.subList(1, args.size()))); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java new file mode 100644 index 0000000..b9be471 --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.compiler; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.adapter.enumerable.RexToLixTranslator; +import org.apache.calcite.interpreter.Context; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.BlockStatement; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to expression ({@link org.apache.calcite.linq4j.tree.Expression}). + * + * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable, + * we need to pass the source code to EvaluationFilter or EvaluationFunction so that they can be serialized, and + * compiled and executed on worker. + */ +public class RexNodeToBlockStatementCompiler { + private final RexBuilder rexBuilder; + + public RexNodeToBlockStatementCompiler(RexBuilder rexBuilder) { + this.rexBuilder = rexBuilder; + } + + public BlockStatement compile(List<RexNode> nodes, RelDataType inputRowType) { + final RexProgramBuilder programBuilder = + new RexProgramBuilder(inputRowType, rexBuilder); + for (RexNode node : nodes) { + programBuilder.addProject(node, null); + } + final RexProgram program = programBuilder.getProgram(); + + final BlockBuilder builder = new BlockBuilder(); + final ParameterExpression context_ = + Expressions.parameter(Context.class, "context"); + final ParameterExpression outputValues_ = + Expressions.parameter(Object[].class, "outputValues"); + final JavaTypeFactoryImpl javaTypeFactory = + new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); + + final RexToLixTranslator.InputGetter inputGetter = + new RexToLixTranslator.InputGetterImpl( + ImmutableList.of( + Pair.<Expression, PhysType>of( + Expressions.field(context_, + BuiltInMethod.CONTEXT_VALUES.field), + PhysTypeImpl.of(javaTypeFactory, inputRowType, + JavaRowFormat.ARRAY, false)))); + final Function1<String, RexToLixTranslator.InputGetter> correlates = + new Function1<String, RexToLixTranslator.InputGetter>() { + public RexToLixTranslator.InputGetter apply(String a0) { + throw new UnsupportedOperationException(); + } + }; + final Expression root = + Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field); + final List<Expression> list = + RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, + null, root, inputGetter, correlates); + for (int i = 0; i < list.size(); i++) { + builder.add( + Expressions.statement( + Expressions.assign( + Expressions.arrayIndex(outputValues_, + Expressions.constant(i)), + list.get(i)))); + } + + return builder.toBlock(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java index 4c69da1..01546ed 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java @@ -24,16 +24,17 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.storm.sql.compiler.CompilerUtil; import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.runtime.AbstractValuesProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.URLClassLoader; -import java.util.ArrayDeque; import java.util.HashSet; -import java.util.Queue; import java.util.Set; public class PlanCompiler { + private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class); + private static final Joiner NEW_LINE_JOINER = Joiner.on("\n"); private static final String PACKAGE_NAME = "org.apache.storm.sql.generated"; private static final String PROLOGUE = NEW_LINE_JOINER.join( @@ -50,7 +51,13 @@ public class PlanCompiler { "import org.apache.storm.sql.runtime.AbstractValuesProcessor;", "import com.google.common.collect.ArrayListMultimap;", "import com.google.common.collect.Multimap;", - "public final class Processor extends AbstractValuesProcessor {", ""); + "import org.apache.calcite.interpreter.Context;", + "import org.apache.calcite.interpreter.StormContext;", + "import org.apache.calcite.DataContext;", + "import org.apache.storm.sql.runtime.calcite.StormDataContext;", + "public final class Processor extends AbstractValuesProcessor {", + " public final static DataContext dataContext = new StormDataContext();", + ""); private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join( " @Override", " public void initialize(Map<String, DataSource> data,", @@ -113,6 +120,7 @@ public class PlanCompiler { public AbstractValuesProcessor compile(RelNode plan) throws Exception { String javaCode = generateJavaSource(plan); + LOG.debug("Compiling... source code {}", javaCode); ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(), PACKAGE_NAME + ".Processor", javaCode, null); http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java index 5c674ad..98409c1 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java @@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler.backends.standalone; import com.google.common.base.Joiner; import com.google.common.primitives.Primitives; import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.tree.BlockStatement; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -30,18 +31,23 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.stream.Delta; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.AggregateFunction; import org.apache.calcite.schema.impl.AggregateFunctionImpl; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; -import org.apache.storm.sql.compiler.ExprCompiler; import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor; +import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler; +import org.apache.storm.tuple.Values; import java.io.PrintWriter; import java.io.StringWriter; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,6 +60,8 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { private final PrintWriter pw; private final JavaTypeFactory typeFactory; + private final RexNodeToBlockStatementCompiler rexCompiler; + private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join( " private static final ChannelHandler %1$s = ", " new AbstractChannelHandler() {", @@ -196,6 +204,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) { this.pw = pw; this.typeFactory = typeFactory; + this.rexCompiler = new RexNodeToBlockStatementCompiler(new RexBuilder(typeFactory)); } @Override @@ -207,8 +216,18 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { @Override public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception { beginStage(filter); - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - String r = filter.getCondition().accept(compiler); + + List<RexNode> childExps = filter.getChildExps(); + RelDataType inputRowType = filter.getInput(0).getRowType(); + + pw.print("Context context = new StormContext(Processor.dataContext);\n"); + pw.print("context.values = _data.toArray();\n"); + pw.print("Object[] outputValues = new Object[1];\n"); + + BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType); + pw.write(codeBlock.toString()); + + String r = "((Boolean) outputValues[0])"; if (filter.getCondition().getType().isNullable()) { pw.print(String.format(" if (%s != null && %s) { ctx.emit(_data); }\n", r, r)); } else { @@ -221,15 +240,19 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { @Override public Void visitProject(Project project, List<Void> inputStreams) throws Exception { beginStage(project); - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - int size = project.getChildExps().size(); - String[] res = new String[size]; - for (int i = 0; i < size; ++i) { - res[i] = project.getChildExps().get(i).accept(compiler); - } - pw.print(String.format(" ctx.emit(new Values(%s));\n", - Joiner.on(',').join(res))); + List<RexNode> childExps = project.getChildExps(); + RelDataType inputRowType = project.getInput(0).getRowType(); + int outputCount = project.getRowType().getFieldCount(); + + pw.print("Context context = new StormContext(Processor.dataContext);\n"); + pw.print("context.values = _data.toArray();\n"); + pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount)); + + BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType); + pw.write(codeBlock.toString()); + + pw.print(" ctx.emit(new Values(outputValues));\n"); endStage(); return null; } @@ -332,7 +355,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { } args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName)); pw.println(String.format(" final %s %s = %s;", resultType.getCanonicalName(), - resultName, ExprCompiler.printMethodCall(aggFn.resultMethod, args))); + resultName, printMethodCall(aggFn.resultMethod, args))); return resultName; } @@ -386,7 +409,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { } args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s", "accumulators.get(\"" + varName + "\")", - ExprCompiler.printMethodCall(aggFn.initMethod, args), + printMethodCall(aggFn.initMethod, args), accumulatorType.getCanonicalName())); if (argList.isEmpty()) { args.add("EMPTY_VALUES"); @@ -397,7 +420,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { } pw.print(String.format(" accumulators.put(\"%s\", %s);\n", varName, - ExprCompiler.printMethodCall(aggFn.addMethod, args))); + printMethodCall(aggFn.addMethod, args))); } private String reserveAggVarName(AggregateCall call) { @@ -449,4 +472,19 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { } return res.toString(); } + + public static String printMethodCall(Method method, List<String> args) { + return printMethodCall(method.getDeclaringClass(), method.getName(), + Modifier.isStatic(method.getModifiers()), args); + } + + private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) { + if (isStatic) { + return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args)); + } else { + return String.format("%s.%s(%s)", args.get(0), method, + Joiner.on(',').join(args.subList(1, args.size()))); + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java index 41777e5..ad98d10 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java @@ -18,6 +18,7 @@ */ package org.apache.storm.sql.compiler.backends.trident; +import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.rel.RelNode; import org.apache.storm.sql.runtime.ISqlTridentDataSource; @@ -31,16 +32,18 @@ import java.util.Map; public class PlanCompiler { private Map<String, ISqlTridentDataSource> sources; private final JavaTypeFactory typeFactory; + private final DataContext dataContext; - public PlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory) { + public PlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory, DataContext dataContext) { this.sources = sources; this.typeFactory = typeFactory; + this.dataContext = dataContext; } public AbstractTridentProcessor compileForTest(RelNode plan) throws Exception { final TridentTopology topology = new TridentTopology(); - TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology); + TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology, dataContext); final IAggregatableStream stream = compiler.traverse(plan); return new AbstractTridentProcessor() { @@ -59,7 +62,7 @@ public class PlanCompiler { public TridentTopology compile(RelNode plan) throws Exception { TridentTopology topology = new TridentTopology(); - TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology); + TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology, dataContext); compiler.traverse(plan); return topology; http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java index a0bfa21..8b08121 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java @@ -21,7 +21,9 @@ package org.apache.storm.sql.compiler.backends.trident; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.primitives.Primitives; +import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.tree.BlockStatement; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -32,11 +34,15 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableModify; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.impl.AggregateFunctionImpl; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; -import org.apache.storm.sql.compiler.ExprCompiler; +import org.apache.calcite.util.Pair; import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor; +import org.apache.storm.sql.compiler.RexNodeToBlockStatementCompiler; import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter; import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction; @@ -92,12 +98,17 @@ class TransformInformation { public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggregatableStream> { protected final Map<String, ISqlTridentDataSource> sources; protected final JavaTypeFactory typeFactory; + protected final RexNodeToBlockStatementCompiler rexCompiler; + private final DataContext dataContext; protected TridentTopology topology; - public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory, TridentTopology topology) { + public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory, + TridentTopology topology, DataContext dataContext) { this.sources = sources; this.typeFactory = typeFactory; this.topology = topology; + this.rexCompiler = new RexNodeToBlockStatementCompiler(new RexBuilder(typeFactory)); + this.dataContext = dataContext; } @Override @@ -202,26 +213,24 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega // Trident doesn't allow duplicated field name... need to do the trick... List<String> outputFieldNames = project.getRowType().getFieldNames(); + int outputCount = outputFieldNames.size(); List<String> temporaryOutputFieldNames = new ArrayList<>(); for (String outputFieldName : outputFieldNames) { temporaryOutputFieldNames.add("__" + outputFieldName + "__"); } - try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - pw.write("import org.apache.storm.tuple.Values;\n"); - - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); + try (StringWriter sw = new StringWriter()) { + List<RexNode> childExps = project.getChildExps(); + RelDataType inputRowType = project.getInput(0).getRowType(); + BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType); + sw.write(codeBlock.toString()); - int size = project.getChildExps().size(); - String[] res = new String[size]; - for (int i = 0; i < size; ++i) { - res[i] = project.getChildExps().get(i).accept(compiler); - } + // we use out parameter and don't use the return value but compile fails... + sw.write("return 0;"); - pw.write(String.format("\nreturn new Values(%s);", Joiner.on(',').join(res))); final String expression = sw.toString(); - return inputStream.each(inputFields, new EvaluationFunction(expression), new Fields(temporaryOutputFieldNames)) + return inputStream.each(inputFields, new EvaluationFunction(expression, outputCount, dataContext), new Fields(temporaryOutputFieldNames)) .project(new Fields(temporaryOutputFieldNames)) .each(new Fields(temporaryOutputFieldNames), new ForwardFunction(), new Fields(outputFieldNames)) .project(new Fields(outputFieldNames)) @@ -239,12 +248,17 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega String stageName = getStageName(filter); try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - String ret = filter.getCondition().accept(compiler); - pw.write(String.format("\nreturn %s;", ret)); + List<RexNode> childExps = filter.getChildExps(); + RelDataType inputRowType = filter.getInput(0).getRowType(); + BlockStatement codeBlock = rexCompiler.compile(childExps, inputRowType); + sw.write(codeBlock.toString()); + + // we use out parameter and don't use the return value but compile fails... + sw.write("return 0;"); + final String expression = sw.toString(); - return inputStream.filter(new EvaluationFilter(expression)).name(stageName); + return inputStream.filter(new EvaluationFilter(expression, dataContext)).name(stageName); } } @@ -257,6 +271,8 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega Stream inputStream = inputStreams.get(0).toStream(); String stageName = getStageName(aggregate); + List<String> outputFieldNames = aggregate.getRowType().getFieldNames(); + List<String> groupByFieldNames = new ArrayList<>(); for (Integer idx : aggregate.getGroupSet()) { String fieldName = inputStream.getOutputFields().get(idx); @@ -268,8 +284,17 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega ChainedAggregatorDeclarer chainedAggregatorDeclarer = groupedStream.chainedAgg(); List<TransformInformation> transformsAfterChained = new ArrayList<>(); - for (AggregateCall call : aggregate.getAggCallList()) { - appendAggregationInChain(chainedAggregatorDeclarer, groupByFieldNames, inputStream, call, transformsAfterChained); + + // Trident doesn't allow duplicated field name... need to do the trick... + List<String> temporaryOutputFieldNames = new ArrayList<>(); + List<String> originOutputFieldNames = new ArrayList<>(); + for (Pair<AggregateCall, String> aggCallToOutFieldName : aggregate.getNamedAggCalls()) { + AggregateCall call = aggCallToOutFieldName.getKey(); + String outFileName = aggCallToOutFieldName.getValue(); + String tempOutFileName = "__" + outFileName + "__"; + temporaryOutputFieldNames.add(tempOutFileName); + originOutputFieldNames.add(outFileName); + appendAggregationInChain(chainedAggregatorDeclarer, groupByFieldNames, tempOutFileName, inputStream, call, transformsAfterChained); } Stream stream = chainedAggregatorDeclarer.chainEnd(); @@ -277,14 +302,13 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega stream = stream.each(transformInformation.getInputFields(), transformInformation.getFunction(), transformInformation.getFunctionFields()); } - // We're OK to project by Calcite information since each aggregation function create output fields via that information - List<String> outputFields = aggregate.getRowType().getFieldNames(); - return stream.project(new Fields(outputFields)).name(stageName); + return stream + .each(new Fields(temporaryOutputFieldNames), new ForwardFunction(), new Fields(originOutputFieldNames)) + .project(new Fields(outputFieldNames)).name(stageName); } private void appendAggregationInChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames, - Stream inputStream, AggregateCall call, List<TransformInformation> transformsAfterChained) { - String outputField = call.getName(); + String outputFieldName, Stream inputStream, AggregateCall call, List<TransformInformation> transformsAfterChained) { SqlAggFunction aggFunction = call.getAggregation(); String aggregationName = call.getAggregation().getName(); @@ -300,27 +324,27 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega } if (aggFunction instanceof SqlUserDefinedAggFunction) { - appendUDAFToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField); + appendUDAFToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName); } else { switch (aggregationName.toUpperCase()) { case "COUNT": - appendCountFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField); + appendCountFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName); break; case "MAX": - appendMaxFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField); + appendMaxFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName); break; case "MIN": - appendMinFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField); + appendMinFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName); break; case "SUM": - appendSumFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField); + appendSumFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName); break; case "AVG": - appendAvgFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField, transformsAfterChained); + appendAvgFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputFieldName, transformsAfterChained); break; default: http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java index 025219b..b0cffdd 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java @@ -18,9 +18,8 @@ package org.apache.storm.sql; import com.google.common.collect.ImmutableMap; -import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.tools.ValidationException; -import org.apache.storm.sql.compiler.backends.standalone.BuiltinAggregateFunctions; +import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.runtime.ChannelHandler; import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; @@ -76,7 +75,7 @@ public class TestStormSql { @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, String properties, List<FieldInfo> fields) { - throw new UnsupportedOperationException("Not supported"); + return new TestUtils.MockSqlTridentDataSource(); } } @@ -96,7 +95,7 @@ public class TestStormSql { @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, String properties, List<FieldInfo> fields) { - throw new UnsupportedOperationException("Not supported"); + return new TestUtils.MockSqlTridentGroupedDataSource(); } } @@ -116,7 +115,7 @@ public class TestStormSql { @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, String properties, List<FieldInfo> fields) { - throw new UnsupportedOperationException("Not supported"); + return new TestUtils.MockSqlTridentJoinDataSourceEmp(); } } @@ -136,7 +135,7 @@ public class TestStormSql { @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, String properties, List<FieldInfo> fields) { - throw new UnsupportedOperationException("Not supported"); + return new TestUtils.MockSqlTridentJoinDataSourceDept(); } } @@ -174,9 +173,9 @@ public class TestStormSql { public void testExternalDataSourceNested() throws Exception { List<String> stmt = new ArrayList<>(); stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'"); - stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + + stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " + "FROM FOO " + - "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200"); + "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200"); StormSql sql = StormSql.construct(); List<Values> values = new ArrayList<>(); ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); @@ -184,16 +183,60 @@ public class TestStormSql { System.out.println(values); Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4); Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); - Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); + Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); } @Test - public void testExternalNestedInvalidAccess() throws Exception { + public void testExternalNestedNonExistKeyAccess() throws Exception { List<String> stmt = new ArrayList<>(); + // this triggers java.lang.RuntimeException: Cannot convert null to int stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'"); stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + - "FROM FOO " + - "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD['a'] = 200"); + "FROM FOO " + + "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(0, values.size()); + } + + @Test + public void testExternalNestedNonExistKeyAccess2() throws Exception { + List<String> stmt = new ArrayList<>(); + // this triggers java.lang.RuntimeException: Cannot convert null to int + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'"); + stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + + "FROM FOO " + + "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(0, values.size()); + } + + @Test + public void testExternalNestedInvalidAccessStringIndexOnArray() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'"); + stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + + "FROM FOO " + + "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(0, values.size()); + } + + @Test + public void testExternalNestedArrayOutOfBoundAccess() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'"); + stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " + + "FROM FOO " + + "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200"); StormSql sql = StormSql.construct(); List<Values> values = new ArrayList<>(); ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); @@ -215,9 +258,10 @@ public class TestStormSql { } - @Test + @Test(expected = CompilingClassLoader.CompilerException.class) public void testExternalUdfType2() throws Exception { List<String> stmt = new ArrayList<>(); + // generated code will be not compilable since return type of MYPLUS and type of 'x' are different stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'"); stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'"); stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'"); http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java index c3b9ad3..8ea0036 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java @@ -18,23 +18,28 @@ package org.apache.storm.sql.compiler; import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.SqlType; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.StreamableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AggregateFunctionImpl; import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; @@ -150,9 +155,32 @@ public class TestCompilerUtils { StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory) .field("ID", SqlTypeName.INTEGER) - .field("MAPFIELD", SqlTypeName.ANY) - .field("NESTEDMAPFIELD", SqlTypeName.ANY) - .field("ARRAYFIELD", SqlTypeName.ANY) + .field("MAPFIELD", + typeFactory.createTypeWithNullability( + typeFactory.createMapType( + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.VARCHAR), true), + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.INTEGER), true)) + , true)) + .field("NESTEDMAPFIELD", + typeFactory.createTypeWithNullability( + typeFactory.createMapType( + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.VARCHAR), true), + typeFactory.createTypeWithNullability( + typeFactory.createMapType( + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.VARCHAR), true), + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.INTEGER), true)) + , true)) + , true)) + .field("ARRAYFIELD", typeFactory.createTypeWithNullability( + typeFactory.createArrayType( + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L) + , true)) .build(); Table table = streamableTable.stream(); schema.add("FOO", table); http://git-wip-us.apache.org/repos/asf/storm/blob/dec76a7e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java deleted file mode 100644 index ae95fcc..0000000 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler; - -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.rel.logical.LogicalProject; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.junit.Test; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -public class TestExprCompiler { - @Test - public void testLiteral() throws Exception { - String sql = "SELECT 1,1.0,TRUE,'FOO' FROM FOO"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); - JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - LogicalProject project = (LogicalProject) state.tree; - String[] res = new String[project.getChildExps().size()]; - for (int i = 0; i < project.getChildExps().size(); ++i) { - StringWriter sw = new StringWriter(); - try (PrintWriter pw = new PrintWriter(sw)) { - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - res[i] = project.getChildExps().get(i).accept(compiler); - } - } - - assertArrayEquals(new String[] {"1", "1.0E0", "true", "\"FOO\""}, res); - } - - @Test - public void testInputRef() throws Exception { - String sql = "SELECT ID FROM FOO"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); - JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - LogicalProject project = (LogicalProject) state.tree; - StringWriter sw = new StringWriter(); - try (PrintWriter pw = new PrintWriter(sw)) { - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - project.getChildExps().get(0).accept(compiler); - } - - assertThat(sw.toString(), containsString("(java.lang.Integer)(_data.get(0));")); - } - - @Test - public void testCallExpr() throws Exception { - String sql = "SELECT 1>2, 3+5, 1-1.0, 3+ID FROM FOO"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); - JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - LogicalProject project = (LogicalProject) state.tree; - String[] res = new String[project.getChildExps().size()]; - List<StringWriter> sw = new ArrayList<>(); - for (int i = 0; i < project.getChildExps().size(); ++i) { - sw.add(new StringWriter()); - } - - for (int i = 0; i < project.getChildExps().size(); ++i) { - try (PrintWriter pw = new PrintWriter(sw.get(i))) { - ExprCompiler compiler = new ExprCompiler(pw, typeFactory); - res[i] = project.getChildExps().get(i).accept(compiler); - } - } - assertThat(sw.get(0).toString(), containsString("1 > 2")); - assertThat(sw.get(1).toString(), containsString("plus(3,5)")); - assertThat(sw.get(2).toString(), containsString("minus(1,1.0E0)")); - assertThat(sw.get(3).toString(), containsString("plus(3,")); - } -}