Repository: storm
Updated Branches:
  refs/heads/1.x-branch a8522bed5 -> ddc3c0458


Merge branch 'STORM-1585' of https://github.com/arunmahadevan/storm into 
STORM-1585


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/291bad33
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/291bad33
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/291bad33

Branch: refs/heads/1.x-branch
Commit: 291bad3391f2a403fc796e225fe6a085a0be9dd8
Parents: a8522be
Author: Sriharsha Chintalapani <[email protected]>
Authored: Thu Apr 14 08:20:42 2016 -0700
Committer: Sriharsha Chintalapani <[email protected]>
Committed: Thu Apr 14 10:29:30 2016 -0700

----------------------------------------------------------------------
 .../storm-sql-core/src/codegen/data/Parser.tdd  |  4 +-
 .../src/codegen/includes/parserImpls.ftl        | 25 ++++++
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 44 +++++++--
 .../apache/storm/sql/compiler/ExprCompiler.java | 26 +++++-
 .../storm/sql/parser/SqlCreateFunction.java     | 94 ++++++++++++++++++++
 .../test/org/apache/storm/sql/TestStormSql.java | 67 +++++++++++++-
 .../storm/sql/compiler/TestCompilerUtils.java   | 15 +++-
 .../backends/standalone/TestPlanCompiler.java   | 16 ++++
 .../apache/storm/sql/parser/TestSqlParser.java  |  6 ++
 .../test/org/apache/storm/sql/TestUtils.java    |  6 ++
 10 files changed, 289 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd 
b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index db3a675..1e921c7 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -34,11 +34,13 @@
     "OUTPUTFORMAT",
     "STORED",
     "TBLPROPERTIES",
+    "JAR"
   ]
 
   # List of methods for parsing custom SQL statements.
   statementParserMethods: [
-    "SqlCreateTable()"
+    "SqlCreateTable()",
+    "SqlCreateFunction()"
   ]
 
   # List of methods for parsing custom literals.

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl 
b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 72a8546..0013231 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -83,4 +83,29 @@ SqlNode SqlCreateTable() :
         input_format_class_name, output_format_class_name, location,
         tbl_properties, select);
     }
+}
+
+/**
+ * CREATE FUNCTION functionname AS 'classname'
+ */
+SqlNode SqlCreateFunction() :
+{
+    SqlParserPos pos;
+    SqlIdentifier functionName;
+    SqlNode className;
+    SqlNode jarName = null;
+}
+{
+    <CREATE> { pos = getPos(); }
+    <FUNCTION>
+        functionName = CompoundIdentifier()
+    <AS>
+        className = StringLiteral()
+    [
+      <USING> <JAR>
+      jarName = StringLiteral()
+    ]
+    {
+      return new SqlCreateFunction(pos, functionName, className, jarName);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 7e5dfcc..bf5a921 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -17,6 +17,12 @@
  */
 package org.apache.storm.sql;
 
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -34,6 +40,7 @@ import 
org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
 import org.apache.storm.sql.runtime.*;
@@ -47,6 +54,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +69,7 @@ class StormSqlImpl extends StormSql {
   private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
   private final SchemaPlus schema = Frameworks.createRootSchema(true);
+  private boolean hasUdf = false;
 
   @Override
   public void execute(
@@ -72,9 +81,10 @@ class StormSqlImpl extends StormSql {
       SqlNode node = parser.impl().parseSqlStmtEof();
       if (node instanceof SqlCreateTable) {
         handleCreateTable((SqlCreateTable) node, dataSources);
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
       } else {
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-            schema).build();
+        FrameworkConfig config = buildFrameWorkConfig();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
@@ -97,9 +107,10 @@ class StormSqlImpl extends StormSql {
       SqlNode node = parser.impl().parseSqlStmtEof();
       if (node instanceof SqlCreateTable) {
         handleCreateTableForTrident((SqlCreateTable) node, dataSources);
-      } else {
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-            schema).build();
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
+      }  else {
+        FrameworkConfig config = buildFrameWorkConfig();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
@@ -153,6 +164,15 @@ class StormSqlImpl extends StormSql {
     dataSources.put(n.tableName(), ds);
   }
 
+  private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) 
throws ClassNotFoundException {
+    if(sqlCreateFunction.jarName() != null) {
+      throw new UnsupportedOperationException("UDF 'USING JAR' not 
implemented");
+    }
+    schema.add(sqlCreateFunction.functionName().toUpperCase(),
+               
ScalarFunctionImpl.create(Class.forName(sqlCreateFunction.className()), 
"evaluate"));
+    hasUdf = true;
+  }
+
   private void handleCreateTableForTrident(
       SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
     List<FieldInfo> fields = updateSchema(n);
@@ -184,4 +204,18 @@ class StormSqlImpl extends StormSql {
     schema.add(n.tableName(), table);
     return fields;
   }
+
+  private FrameworkConfig buildFrameWorkConfig() {
+    if (hasUdf) {
+      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+      sqlOperatorTables.add(SqlStdOperatorTable.instance());
+      sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
+                                                     false,
+                                                     
Collections.<String>emptyList(), typeFactory));
+      return Frameworks.newConfigBuilder().defaultSchema(schema)
+              .operatorTable(new 
ChainedSqlOperatorTable(sqlOperatorTables)).build();
+    } else {
+      return Frameworks.newConfigBuilder().defaultSchema(schema).build();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index c43c32f..df0c27f 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -208,9 +208,22 @@ public class ExprCompiler implements RexVisitor<String> {
       this.translators = builder.build();
     }
 
+    private CallExprPrinter getCallExprPrinter(SqlOperator op) {
+      if (translators.containsKey(op)) {
+        return translators.get(op);
+      } else if (op instanceof SqlUserDefinedFunction) {
+        Function function = ((SqlUserDefinedFunction) op).getFunction();
+        if (function instanceof ReflectiveFunctionBase) {
+          Method method = ((ReflectiveFunctionBase) function).method;
+          return methodCall(op, method, NullPolicy.STRICT).getValue();
+        }
+      }
+      return null;
+    }
+
     private String compile(ExprCompiler compiler, RexCall call) {
       SqlOperator op = call.getOperator();
-      CallExprPrinter printer = translators.get(op);
+      CallExprPrinter printer = getCallExprPrinter(op);
       if (printer == null) {
         throw new UnsupportedOperationException();
       } else {
@@ -218,8 +231,8 @@ public class ExprCompiler implements RexVisitor<String> {
       }
     }
 
-    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
-            final SqlOperator op, final BuiltInMethod method, NullPolicy 
nullPolicy) {
+    private Map.Entry<SqlOperator, CallExprPrinter> methodCall(
+            final SqlOperator op, final Method method, NullPolicy nullPolicy) {
       if (nullPolicy != NullPolicy.STRICT) {
         throw new UnsupportedOperationException();
       }
@@ -240,7 +253,7 @@ public class ExprCompiler implements RexVisitor<String> {
               pw.print(String.format("else if (%2$s == null) { %1$s = null; 
}\n", val, arg));
             }
           }
-          String calc = printMethodCall(method.method, args);
+          String calc = printMethodCall(method, args);
           pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
           return val;
         }
@@ -248,6 +261,11 @@ public class ExprCompiler implements RexVisitor<String> {
       return new AbstractMap.SimpleImmutableEntry<>(op, printer);
     }
 
+    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
+        final SqlOperator op, final BuiltInMethod method, NullPolicy 
nullPolicy) {
+      return methodCall(op, method.method, nullPolicy);
+    }
+
     private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
         (final SqlOperator op, final String javaOperator, final Class<?> 
clazz, final String backupMethodName) {
       CallExprPrinter trans = new CallExprPrinter() {

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..a53802c
--- /dev/null
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+public class SqlCreateFunction extends SqlCall {
+    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+            "CREATE_FUNCTION", SqlKind.OTHER) {
+        @Override
+        public SqlCall createCall(
+                SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+            assert functionQualifier == null;
+            return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], 
o[2]);
+        }
+
+        @Override
+        public void unparse(
+                SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+            SqlCreateFunction t = (SqlCreateFunction) call;
+            UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+            u.keyword("CREATE", 
"FUNCTION").node(t.functionName).keyword("AS").node(t.className);
+            if (t.jarName != null) {
+                u.keyword("USING", "JAR").node(t.jarName);
+            }
+        }
+    };
+
+    private final SqlIdentifier functionName;
+    private final SqlNode className;
+    private final SqlNode jarName;
+
+    public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, 
SqlNode className, SqlNode jarName) {
+        super(pos);
+        this.functionName = functionName;
+        this.className = className;
+        this.jarName = jarName;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(functionName, className);
+    }
+
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        getOperator().unparse(writer, this, leftPrec, rightPrec);
+    }
+
+    public String functionName() {
+        return functionName.toString();
+    }
+
+    public String className() {
+        return ((NlsString)SqlLiteral.value(className)).getValue();
+    }
+
+    public String jarName() {
+        return jarName == null ? null : 
((NlsString)SqlLiteral.value(jarName)).getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index a85a907..872b83d 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,8 +18,15 @@
 package org.apache.storm.sql;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.sql.runtime.*;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -129,4 +136,58 @@ public class TestStormSql {
     sql.execute(stmt, h);
     Assert.assertEquals(0, values.size());
   }
+
+  @Test(expected = ValidationException.class)
+  public void testExternalUdfType() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
+    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+    stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    System.out.println(values);
+
+  }
+
+  @Test
+  public void testExternalUdfType2() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 
'mock:///foo'");
+    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+    stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(0, values.size());
+  }
+
+  @Test
+  public void testExternalUdf() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals(4, values.get(0).get(0));
+    Assert.assertEquals(5, values.get(1).get(0));
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testExternalUdfUsingJar() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
+    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 43b54f7..8a14eee 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -46,6 +46,12 @@ import java.util.List;
 
 public class TestCompilerUtils {
 
+    public static class MyPlus {
+        public static Integer eval(Integer x, Integer y) {
+            return x + y;
+        }
+    }
+
     public static CalciteState sqlOverDummyTable(String sql)
             throws RelConversionException, ValidationException, 
SqlParseException {
         SchemaPlus schema = Frameworks.createRootSchema(true);
@@ -83,8 +89,15 @@ public class TestCompilerUtils {
         Table table = streamableTable.stream();
         schema.add("FOO", table);
         schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
+                                                       false,
+                                                       
Collections.<String>emptyList(), typeFactory));
+        SqlOperatorTable chainedSqlOperatorTable = new 
ChainedSqlOperatorTable(sqlOperatorTables);
         FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-                schema).build();
+                schema).operatorTable(chainedSqlOperatorTable).build();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index 414aeee..547114f 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -86,4 +86,20 @@ public class TestPlanCompiler {
     Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
     Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 
300)), values.get(0));
   }
+
+  @Test
+  public void testUdf() throws Exception {
+    String sql = "SELECT MYPLUS(ID, 3)" +
+            "FROM FOO " +
+            "WHERE ID = 2";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Assert.assertEquals(new Values(5), values.get(0));
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
index b957565..68054d8 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -41,6 +41,12 @@ public class TestSqlParser {
     parse(sql);
   }
 
+  @Test
+  public void testCreateFunction() throws Exception {
+    String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'";
+    parse(sql);
+  }
+
   private static SqlNode parse(String sql) throws Exception {
     StormParser parser = new StormParser(sql);
     return parser.impl().parseSqlStmtEof();

http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index da763a7..5091e3a 100644
--- 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -41,6 +41,12 @@ import java.util.List;
 import java.util.Map;
 
 public class TestUtils {
+  public static class MyPlus {
+    public static Integer evaluate(Integer x, Integer y) {
+      return x + y;
+    }
+  }
+
   public static class MockDataSource implements DataSource {
     private final ArrayList<Values> RECORDS = new ArrayList<>();
 

Reply via email to