Repository: storm Updated Branches: refs/heads/master 2b7a75839 -> 39163bfce
[StormSQL] Implement compiler for expressions. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e2fe47c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e2fe47c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e2fe47c Branch: refs/heads/master Commit: 4e2fe47c3b322552f7dc8f2861ef3e496054006b Parents: b6fa601 Author: Haohui Mai <whe...@apache.org> Authored: Thu Oct 22 16:42:24 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Fri Dec 4 11:09:01 2015 -0800 ---------------------------------------------------------------------- .../apache/storm/sql/compiler/CompilerUtil.java | 35 +++ .../apache/storm/sql/compiler/ExprCompiler.java | 216 +++++++++++++++++++ .../storm/sql/compiler/TestExprCompiler.java | 90 ++++++++ .../apache/storm/sql/compiler/TestUtils.java | 33 +++ 4 files changed, 374 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4e2fe47c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java new file mode 100644 index 0000000..5e7453a --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.compiler; + +import org.apache.calcite.util.Util; + +class CompilerUtil { + static String escapeJavaString(String s, boolean nullMeansNull) { + if(s == null) { + return nullMeansNull ? "null" : "\"\""; + } else { + String s1 = Util.replace(s, "\\", "\\\\"); + String s2 = Util.replace(s1, "\"", "\\\""); + String s3 = Util.replace(s2, "\n\r", "\\n"); + String s4 = Util.replace(s3, "\n", "\\n"); + String s5 = Util.replace(s4, "\r", "\\r"); + return "\"" + s5 + "\""; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4e2fe47c/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java new file mode 100644 index 0000000..b9d8f88 --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.compiler; + +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Util; + +import java.io.PrintWriter; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.util.AbstractMap; +import java.util.IdentityHashMap; +import java.util.Map; + +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE_INTEGER; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN_OR_EQUAL; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MINUS; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MULTIPLY; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.PLUS; + +/** + * Compile RexNode on top of the Tuple abstraction. + */ +class ExprCompiler implements RexVisitor<String> { + private final PrintWriter pw; + private final Map<RexNode, String> expr = new IdentityHashMap<>(); + private final JavaTypeFactory typeFactory; + private static final ImpTable IMP_TABLE = new ImpTable(); + + ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) { + this.pw = pw; + this.typeFactory = typeFactory; + } + + @Override + public String visitInputRef(RexInputRef rexInputRef) { + if (expr.containsKey(rexInputRef)) { + return expr.get(rexInputRef); + } + String name = printExpr(rexInputRef, String.format("_data.get(%d)", + rexInputRef.getIndex())); + expr.put(rexInputRef, name); + return name; + } + + @Override + public String visitLocalRef(RexLocalRef rexLocalRef) { + throw new UnsupportedOperationException(); + } + + @Override + public String visitLiteral(RexLiteral rexLiteral) { + Object v = rexLiteral.getValue(); + RelDataType ty = rexLiteral.getType(); + switch(rexLiteral.getTypeName()) { + case BOOLEAN: + return v.toString(); + case CHAR: + return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true); + case NULL: + return "null"; + case DOUBLE: + case BIGINT: + case DECIMAL: + switch (ty.getSqlTypeName()) { + case TINYINT: + case SMALLINT: + case INTEGER: + return Long.toString(((BigDecimal) v).longValueExact()); + case BIGINT: + return Long.toString(((BigDecimal)v).longValueExact()) + 'L'; + case DECIMAL: + case FLOAT: + case REAL: + case DOUBLE: + return Util.toScientificNotation((BigDecimal) v); + } + break; + default: + throw new UnsupportedOperationException(); + } + return null; + } + + @Override + public String visitCall(RexCall rexCall) { + return IMP_TABLE.compile(this, rexCall); + } + + @Override + public String visitOver(RexOver rexOver) { + throw new UnsupportedOperationException(); + } + + @Override + public String visitCorrelVariable( + RexCorrelVariable rexCorrelVariable) { + throw new UnsupportedOperationException(); + } + + @Override + public String visitDynamicParam( + RexDynamicParam rexDynamicParam) { + throw new UnsupportedOperationException(); + } + + @Override + public String visitRangeRef(RexRangeRef rexRangeRef) { + throw new UnsupportedOperationException(); + } + + @Override + public String visitFieldAccess( + RexFieldAccess rexFieldAccess) { + throw new UnsupportedOperationException(); + } + + private String printExpr(RexNode node, String definition) { + String name = "t" + expr.size(); + Type ty = typeFactory.getJavaClass(node.getType()); + String typeName = ((Class<?>)ty).getCanonicalName(); + pw.append( + String.format("%s %s = (%s)(%s);\n", typeName, name, typeName, definition)); + return name; + } + + private interface CallExprPrinter { + String translate(ExprCompiler compiler, RexCall call); + } + + /** + * Inspired by Calcite's RexImpTable, the ImpTable class maps the operators + * to their corresponding implementation that generates the expressions in + * the format of Java source code. + */ + private static class ImpTable { + private final Map<SqlOperator, CallExprPrinter> translators; + + private ImpTable() { + ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder = + ImmutableMap.builder(); + builder.put(infixBinary(LESS_THAN, "<")) + .put(infixBinary(LESS_THAN_OR_EQUAL, "<=")) + .put(infixBinary(GREATER_THAN, ">")) + .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=")) + .put(infixBinary(PLUS, "+")) + .put(infixBinary(MINUS, "-")) + .put(infixBinary(MULTIPLY, "*")) + .put(infixBinary(DIVIDE, "/")) + .put(infixBinary(DIVIDE_INTEGER, "/")); + this.translators = builder.build(); + } + + private String compile(ExprCompiler compiler, RexCall call) { + SqlOperator op = call.getOperator(); + CallExprPrinter printer = translators.get(op); + if (printer == null) { + throw new UnsupportedOperationException(); + } else { + return printer.translate(compiler, call); + } + } + + private Map.Entry<SqlOperator, CallExprPrinter> infixBinary + (SqlOperator op, final String javaOperator) { + CallExprPrinter trans = new CallExprPrinter() { + @Override + public String translate( + ExprCompiler compiler, RexCall call) { + int size = call.getOperands().size(); + assert size == 2; + String[] ops = new String[size]; + for (int i = 0; i < size; ++i) { + ops[i] = call.getOperands().get(i).accept(compiler); + } + return String.format("%s %s %s", ops[0], javaOperator, ops[1]); + } + }; + return new AbstractMap.SimpleImmutableEntry<>(op, trans); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4e2fe47c/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java new file mode 100644 index 0000000..6409d63 --- /dev/null +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.compiler; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.junit.Test; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TestExprCompiler { + @Test + public void testLiteral() throws Exception { + String sql = "SELECT 1,1.0,TRUE,'FOO' FROM FOO"; + TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql); + JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + LogicalProject project = (LogicalProject) state.tree; + String[] res = new String[project.getChildExps().size()]; + for (int i = 0; i < project.getChildExps().size(); ++i) { + StringWriter sw = new StringWriter(); + try (PrintWriter pw = new PrintWriter(sw)) { + ExprCompiler compiler = new ExprCompiler(pw, typeFactory); + res[i] = project.getChildExps().get(i).accept(compiler); + } + } + + assertArrayEquals(new String[] {"1", "1.0E0", "true", "\"FOO\""}, res); + } + + @Test + public void testInputRef() throws Exception { + String sql = "SELECT ID FROM FOO"; + TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql); + JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + LogicalProject project = (LogicalProject) state.tree; + StringWriter sw = new StringWriter(); + try (PrintWriter pw = new PrintWriter(sw)) { + ExprCompiler compiler = new ExprCompiler(pw, typeFactory); + project.getChildExps().get(0).accept(compiler); + } + + assertEquals("int t0 = (int)(_data.get(0));\n", sw.toString()); + } + + @Test + public void testCallExpr() throws Exception { + String sql = "SELECT 1>2, 3+5, 1-1.0, 3+ID FROM FOO"; + TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql); + JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + LogicalProject project = (LogicalProject) state.tree; + String[] res = new String[project.getChildExps().size()]; + List<StringWriter> sw = new ArrayList<>(); + for (int i = 0; i < project.getChildExps().size(); ++i) { + sw.add(new StringWriter()); + } + + for (int i = 0; i < project.getChildExps().size(); ++i) { + try (PrintWriter pw = new PrintWriter(sw.get(i))) { + ExprCompiler compiler = new ExprCompiler(pw, typeFactory); + res[i] = project.getChildExps().get(i).accept(compiler); + } + } + assertArrayEquals(new String[]{"1 > 2", "3 + 5", "1 - 1.0E0", "3 + t0"}, + res); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/4e2fe47c/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java index c16cc49..ae4300a 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java @@ -1,18 +1,41 @@ package org.apache.storm.sql.compiler; import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Statistic; import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.ImmutableBitSet; import java.util.ArrayList; class TestUtils { + static CalciteState sqlOverDummyTable(String sql) + throws RelConversionException, ValidationException, SqlParseException { + SchemaPlus schema = Frameworks.createRootSchema(true); + Table table = newTable().field("ID", SqlTypeName.INTEGER).build(); + schema.add("FOO", table); + FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( + schema).build(); + Planner planner = Frameworks.getPlanner(config); + SqlNode parse = planner.parse(sql); + SqlNode validate = planner.validate(parse); + RelNode tree = planner.convert(validate); + return new CalciteState(schema, tree); + } + static class TableBuilderInfo { private static class FieldType { private static final int NO_PRECISION = -1; @@ -89,4 +112,14 @@ class TestUtils { static TableBuilderInfo newTable() { return new TableBuilderInfo(); } + + static class CalciteState { + final SchemaPlus schema; + final RelNode tree; + + private CalciteState(SchemaPlus schema, RelNode tree) { + this.schema = schema; + this.tree = tree; + } + } }