Repository: storm Updated Branches: refs/heads/master 1db08472c -> a41fef386
[STORM-1586] Added UDF support in ExprComplier Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/521b367a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/521b367a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/521b367a Branch: refs/heads/master Commit: 521b367aeec0526b762c5a5a9b2bea2b373e5fd5 Parents: 87e3c24 Author: Arun Mahadevan <[email protected]> Authored: Fri Feb 26 13:11:32 2016 +0530 Committer: Arun Mahadevan <[email protected]> Committed: Tue Mar 1 12:19:24 2016 +0530 ---------------------------------------------------------------------- .../apache/storm/sql/compiler/ExprCompiler.java | 26 +++++++++++++++++--- .../storm/sql/compiler/TestCompilerUtils.java | 15 ++++++++++- .../backends/standalone/TestPlanCompiler.java | 16 ++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java index c43c32f..df0c27f 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java @@ -208,9 +208,22 @@ public class ExprCompiler implements RexVisitor<String> { this.translators = builder.build(); } + private CallExprPrinter getCallExprPrinter(SqlOperator op) { + if (translators.containsKey(op)) { + return translators.get(op); + } else if (op instanceof SqlUserDefinedFunction) { + Function function = ((SqlUserDefinedFunction) op).getFunction(); + if (function instanceof ReflectiveFunctionBase) { + Method method = ((ReflectiveFunctionBase) function).method; + return methodCall(op, method, NullPolicy.STRICT).getValue(); + } + } + return null; + } + private String compile(ExprCompiler compiler, RexCall call) { SqlOperator op = call.getOperator(); - CallExprPrinter printer = translators.get(op); + CallExprPrinter printer = getCallExprPrinter(op); if (printer == null) { throw new UnsupportedOperationException(); } else { @@ -218,8 +231,8 @@ public class ExprCompiler implements RexVisitor<String> { } } - private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod( - final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) { + private Map.Entry<SqlOperator, CallExprPrinter> methodCall( + final SqlOperator op, final Method method, NullPolicy nullPolicy) { if (nullPolicy != NullPolicy.STRICT) { throw new UnsupportedOperationException(); } @@ -240,7 +253,7 @@ public class ExprCompiler implements RexVisitor<String> { pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg)); } } - String calc = printMethodCall(method.method, args); + String calc = printMethodCall(method, args); pw.print(String.format("else { %1$s = %2$s; }\n", val, calc)); return val; } @@ -248,6 +261,11 @@ public class ExprCompiler implements RexVisitor<String> { return new AbstractMap.SimpleImmutableEntry<>(op, printer); } + private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod( + final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) { + return methodCall(op, method.method, nullPolicy); + } + private Map.Entry<SqlOperator, CallExprPrinter> infixBinary (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) { CallExprPrinter trans = new CallExprPrinter() { http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java index 43b54f7..8a14eee 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java @@ -46,6 +46,12 @@ import java.util.List; public class TestCompilerUtils { + public static class MyPlus { + public static Integer eval(Integer x, Integer y) { + return x + y; + } + } + public static CalciteState sqlOverDummyTable(String sql) throws RelConversionException, ValidationException, SqlParseException { SchemaPlus schema = Frameworks.createRootSchema(true); @@ -83,8 +89,15 @@ public class TestCompilerUtils { Table table = streamableTable.stream(); schema.add("FOO", table); schema.add("BAR", table); + schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval")); + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), + false, + Collections.<String>emptyList(), typeFactory)); + SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables); FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + schema).operatorTable(chainedSqlOperatorTable).build(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java index 414aeee..547114f 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java @@ -86,4 +86,20 @@ public class TestPlanCompiler { Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); } + + @Test + public void testUdf() throws Exception { + String sql = "SELECT MYPLUS(ID, 3)" + + "FROM FOO " + + "WHERE ID = 2"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); + PlanCompiler compiler = new PlanCompiler(typeFactory); + AbstractValuesProcessor proc = compiler.compile(state.tree()); + Map<String, DataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockDataSource()); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + proc.initialize(data, h); + Assert.assertEquals(new Values(5), values.get(0)); + } }
