Repository: storm
Updated Branches:
  refs/heads/master 1db08472c -> a41fef386


[STORM-1586] Added UDF support in ExprComplier


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/521b367a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/521b367a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/521b367a

Branch: refs/heads/master
Commit: 521b367aeec0526b762c5a5a9b2bea2b373e5fd5
Parents: 87e3c24
Author: Arun Mahadevan <[email protected]>
Authored: Fri Feb 26 13:11:32 2016 +0530
Committer: Arun Mahadevan <[email protected]>
Committed: Tue Mar 1 12:19:24 2016 +0530

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/ExprCompiler.java | 26 +++++++++++++++++---
 .../storm/sql/compiler/TestCompilerUtils.java   | 15 ++++++++++-
 .../backends/standalone/TestPlanCompiler.java   | 16 ++++++++++++
 3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index c43c32f..df0c27f 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -208,9 +208,22 @@ public class ExprCompiler implements RexVisitor<String> {
       this.translators = builder.build();
     }
 
+    private CallExprPrinter getCallExprPrinter(SqlOperator op) {
+      if (translators.containsKey(op)) {
+        return translators.get(op);
+      } else if (op instanceof SqlUserDefinedFunction) {
+        Function function = ((SqlUserDefinedFunction) op).getFunction();
+        if (function instanceof ReflectiveFunctionBase) {
+          Method method = ((ReflectiveFunctionBase) function).method;
+          return methodCall(op, method, NullPolicy.STRICT).getValue();
+        }
+      }
+      return null;
+    }
+
     private String compile(ExprCompiler compiler, RexCall call) {
       SqlOperator op = call.getOperator();
-      CallExprPrinter printer = translators.get(op);
+      CallExprPrinter printer = getCallExprPrinter(op);
       if (printer == null) {
         throw new UnsupportedOperationException();
       } else {
@@ -218,8 +231,8 @@ public class ExprCompiler implements RexVisitor<String> {
       }
     }
 
-    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
-            final SqlOperator op, final BuiltInMethod method, NullPolicy 
nullPolicy) {
+    private Map.Entry<SqlOperator, CallExprPrinter> methodCall(
+            final SqlOperator op, final Method method, NullPolicy nullPolicy) {
       if (nullPolicy != NullPolicy.STRICT) {
         throw new UnsupportedOperationException();
       }
@@ -240,7 +253,7 @@ public class ExprCompiler implements RexVisitor<String> {
               pw.print(String.format("else if (%2$s == null) { %1$s = null; 
}\n", val, arg));
             }
           }
-          String calc = printMethodCall(method.method, args);
+          String calc = printMethodCall(method, args);
           pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
           return val;
         }
@@ -248,6 +261,11 @@ public class ExprCompiler implements RexVisitor<String> {
       return new AbstractMap.SimpleImmutableEntry<>(op, printer);
     }
 
+    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
+        final SqlOperator op, final BuiltInMethod method, NullPolicy 
nullPolicy) {
+      return methodCall(op, method.method, nullPolicy);
+    }
+
     private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
         (final SqlOperator op, final String javaOperator, final Class<?> 
clazz, final String backupMethodName) {
       CallExprPrinter trans = new CallExprPrinter() {

http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 43b54f7..8a14eee 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -46,6 +46,12 @@ import java.util.List;
 
 public class TestCompilerUtils {
 
+    public static class MyPlus {
+        public static Integer eval(Integer x, Integer y) {
+            return x + y;
+        }
+    }
+
     public static CalciteState sqlOverDummyTable(String sql)
             throws RelConversionException, ValidationException, 
SqlParseException {
         SchemaPlus schema = Frameworks.createRootSchema(true);
@@ -83,8 +89,15 @@ public class TestCompilerUtils {
         Table table = streamableTable.stream();
         schema.add("FOO", table);
         schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
+                                                       false,
+                                                       
Collections.<String>emptyList(), typeFactory));
+        SqlOperatorTable chainedSqlOperatorTable = new 
ChainedSqlOperatorTable(sqlOperatorTables);
         FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-                schema).build();
+                schema).operatorTable(chainedSqlOperatorTable).build();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);

http://git-wip-us.apache.org/repos/asf/storm/blob/521b367a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index 414aeee..547114f 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -86,4 +86,20 @@ public class TestPlanCompiler {
     Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
     Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
   }
+
+  @Test
+  public void testUdf() throws Exception {
+    String sql = "SELECT MYPLUS(ID, 3)" +
+            "FROM FOO " +
+            "WHERE ID = 2";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Assert.assertEquals(new Values(5), values.get(0));
+  }
 }

Reply via email to