[STORM-1585] Add DDL support for UDFs in Storm-sql

This patch proposes to expose the user defined function support added
in STORM-1586 via DDL statements.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0047279a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0047279a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0047279a

Branch: refs/heads/master
Commit: 0047279a71a98a4326c00929ca990e415d0fbce1
Parents: 521b367
Author: Arun Mahadevan <[email protected]>
Authored: Mon Feb 29 23:56:30 2016 +0530
Committer: Arun Mahadevan <[email protected]>
Committed: Tue Mar 1 13:58:16 2016 +0530

----------------------------------------------------------------------
 .../storm-sql-core/src/codegen/data/Parser.tdd  |  3 +-
 .../src/codegen/includes/parserImpls.ftl        | 19 ++++++
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 41 ++++++++++--
 .../storm/sql/parser/SqlCreateFunction.java     | 68 ++++++++++++++++++++
 .../test/org/apache/storm/sql/TestStormSql.java | 23 ++++++-
 .../apache/storm/sql/parser/TestSqlParser.java  |  6 ++
 .../test/org/apache/storm/sql/TestUtils.java    |  6 ++
 7 files changed, 159 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd 
b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index db3a675..2ddf111 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -38,7 +38,8 @@
 
   # List of methods for parsing custom SQL statements.
   statementParserMethods: [
-    "SqlCreateTable()"
+    "SqlCreateTable()",
+    "SqlCreateFunction()"
   ]
 
   # List of methods for parsing custom literals.

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl 
b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 72a8546..c26d3a7 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -83,4 +83,23 @@ SqlNode SqlCreateTable() :
         input_format_class_name, output_format_class_name, location,
         tbl_properties, select);
     }
+}
+
+/**
+ * CREATE FUNCTION functionname AS 'classname'
+ */
+SqlNode SqlCreateFunction() :
+{
+    SqlParserPos pos;
+    SqlIdentifier functionName;
+    SqlNode className;
+}
+{
+    <CREATE> { pos = getPos(); }
+    <FUNCTION>
+    functionName = CompoundIdentifier()
+    <AS>
+    className = StringLiteral() {
+        return new SqlCreateFunction(pos, functionName, className);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 7e5dfcc..b4bba8e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -17,6 +17,12 @@
  */
 package org.apache.storm.sql;
 
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -34,6 +40,7 @@ import 
org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
 import org.apache.storm.sql.runtime.*;
@@ -47,6 +54,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +69,7 @@ class StormSqlImpl extends StormSql {
   private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
   private final SchemaPlus schema = Frameworks.createRootSchema(true);
+  private boolean hasUdf = false;
 
   @Override
   public void execute(
@@ -72,9 +81,10 @@ class StormSqlImpl extends StormSql {
       SqlNode node = parser.impl().parseSqlStmtEof();
       if (node instanceof SqlCreateTable) {
         handleCreateTable((SqlCreateTable) node, dataSources);
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
       } else {
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-            schema).build();
+        FrameworkConfig config = buildFrameWorkConfig();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
@@ -97,9 +107,10 @@ class StormSqlImpl extends StormSql {
       SqlNode node = parser.impl().parseSqlStmtEof();
       if (node instanceof SqlCreateTable) {
         handleCreateTableForTrident((SqlCreateTable) node, dataSources);
-      } else {
-        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-            schema).build();
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
+      }  else {
+        FrameworkConfig config = buildFrameWorkConfig();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
@@ -153,6 +164,12 @@ class StormSqlImpl extends StormSql {
     dataSources.put(n.tableName(), ds);
   }
 
+  private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) 
throws ClassNotFoundException {
+    schema.add(sqlCreateFunction.functionName().toUpperCase(),
+               
ScalarFunctionImpl.create(Class.forName(sqlCreateFunction.className()), 
"evaluate"));
+    hasUdf = true;
+  }
+
   private void handleCreateTableForTrident(
       SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
     List<FieldInfo> fields = updateSchema(n);
@@ -184,4 +201,18 @@ class StormSqlImpl extends StormSql {
     schema.add(n.tableName(), table);
     return fields;
   }
+
+  private FrameworkConfig buildFrameWorkConfig() {
+    if (hasUdf) {
+      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+      sqlOperatorTables.add(SqlStdOperatorTable.instance());
+      sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
+                                                     false,
+                                                     
Collections.<String>emptyList(), typeFactory));
+      return Frameworks.newConfigBuilder().defaultSchema(schema)
+              .operatorTable(new 
ChainedSqlOperatorTable(sqlOperatorTables)).build();
+    } else {
+      return Frameworks.newConfigBuilder().defaultSchema(schema).build();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..5dcd7d1
--- /dev/null
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,68 @@
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+public class SqlCreateFunction extends SqlCall {
+    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+            "CREATE_FUNCTION", SqlKind.OTHER) {
+        @Override
+        public SqlCall createCall(
+                SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+            assert functionQualifier == null;
+            return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1]);
+        }
+
+        @Override
+        public void unparse(
+                SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+            SqlCreateFunction t = (SqlCreateFunction) call;
+            UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+            u.keyword("CREATE", 
"FUNCTION").node(t.functionName).keyword("AS").node(t.className);
+        }
+    };
+
+    private final SqlIdentifier functionName;
+    private final SqlNode className;
+
+    public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, 
SqlNode className) {
+        super(pos);
+        this.functionName = functionName;
+        this.className = className;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(functionName, className);
+    }
+
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        getOperator().unparse(writer, this, leftPrec, rightPrec);
+    }
+
+    public String functionName() {
+        return functionName.toString();
+    }
+
+    public String className() {
+        return ((NlsString)SqlLiteral.value(className)).getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index a85a907..ce1e27f 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -18,8 +18,13 @@
 package org.apache.storm.sql;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.sql.runtime.*;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -129,4 +134,20 @@ public class TestStormSql {
     sql.execute(stmt, h);
     Assert.assertEquals(0, values.size());
   }
+
+  @Test
+  public void testExternalUdf() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+    stmt.add("CREATE FUNCTION MYPLUS AS 
'org.apache.storm.sql.TestUtils$MyPlus'");
+    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals(4, values.get(0).get(0));
+    Assert.assertEquals(5, values.get(1).get(0));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
index b957565..68054d8 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -41,6 +41,12 @@ public class TestSqlParser {
     parse(sql);
   }
 
+  @Test
+  public void testCreateFunction() throws Exception {
+    String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'";
+    parse(sql);
+  }
+
   private static SqlNode parse(String sql) throws Exception {
     StormParser parser = new StormParser(sql);
     return parser.impl().parseSqlStmtEof();

http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index da763a7..5091e3a 100644
--- 
a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ 
b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -41,6 +41,12 @@ import java.util.List;
 import java.util.Map;
 
 public class TestUtils {
+  public static class MyPlus {
+    public static Integer evaluate(Integer x, Integer y) {
+      return x + y;
+    }
+  }
+
   public static class MockDataSource implements DataSource {
     private final ArrayList<Values> RECORDS = new ArrayList<>();
 

Reply via email to