[STORM-1585] Add DDL support for UDFs in Storm-sql This patch proposes to expose the user defined function support added in STORM-1586 via DDL statements.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0047279a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0047279a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0047279a Branch: refs/heads/master Commit: 0047279a71a98a4326c00929ca990e415d0fbce1 Parents: 521b367 Author: Arun Mahadevan <[email protected]> Authored: Mon Feb 29 23:56:30 2016 +0530 Committer: Arun Mahadevan <[email protected]> Committed: Tue Mar 1 13:58:16 2016 +0530 ---------------------------------------------------------------------- .../storm-sql-core/src/codegen/data/Parser.tdd | 3 +- .../src/codegen/includes/parserImpls.ftl | 19 ++++++ .../jvm/org/apache/storm/sql/StormSqlImpl.java | 41 ++++++++++-- .../storm/sql/parser/SqlCreateFunction.java | 68 ++++++++++++++++++++ .../test/org/apache/storm/sql/TestStormSql.java | 23 ++++++- .../apache/storm/sql/parser/TestSqlParser.java | 6 ++ .../test/org/apache/storm/sql/TestUtils.java | 6 ++ 7 files changed, 159 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd index db3a675..2ddf111 100644 --- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd +++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd @@ -38,7 +38,8 @@ # List of methods for parsing custom SQL statements. statementParserMethods: [ - "SqlCreateTable()" + "SqlCreateTable()", + "SqlCreateFunction()" ] # List of methods for parsing custom literals. http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl index 72a8546..c26d3a7 100644 --- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl +++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl @@ -83,4 +83,23 @@ SqlNode SqlCreateTable() : input_format_class_name, output_format_class_name, location, tbl_properties, select); } +} + +/** + * CREATE FUNCTION functionname AS 'classname' + */ +SqlNode SqlCreateFunction() : +{ + SqlParserPos pos; + SqlIdentifier functionName; + SqlNode className; +} +{ + <CREATE> { pos = getPos(); } + <FUNCTION> + functionName = CompoundIdentifier() + <AS> + className = StringLiteral() { + return new SqlCreateFunction(pos, functionName, className); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index 7e5dfcc..b4bba8e 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -17,6 +17,12 @@ */ package org.apache.storm.sql; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.SubmitOptions; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -34,6 +40,7 @@ import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler; import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.parser.ColumnConstraint; import org.apache.storm.sql.parser.ColumnDefinition; +import org.apache.storm.sql.parser.SqlCreateFunction; import org.apache.storm.sql.parser.SqlCreateTable; import org.apache.storm.sql.parser.StormParser; import org.apache.storm.sql.runtime.*; @@ -47,6 +54,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +69,7 @@ class StormSqlImpl extends StormSql { private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT); private final SchemaPlus schema = Frameworks.createRootSchema(true); + private boolean hasUdf = false; @Override public void execute( @@ -72,9 +81,10 @@ class StormSqlImpl extends StormSql { SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { handleCreateTable((SqlCreateTable) node, dataSources); + } else if (node instanceof SqlCreateFunction) { + handleCreateFunction((SqlCreateFunction) node); } else { - FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + FrameworkConfig config = buildFrameWorkConfig(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); @@ -97,9 +107,10 @@ class StormSqlImpl extends StormSql { SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { handleCreateTableForTrident((SqlCreateTable) node, dataSources); - } else { - FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + } else if (node instanceof SqlCreateFunction) { + handleCreateFunction((SqlCreateFunction) node); + } else { + FrameworkConfig config = buildFrameWorkConfig(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); @@ -153,6 +164,12 @@ class StormSqlImpl extends StormSql { dataSources.put(n.tableName(), ds); } + private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException { + schema.add(sqlCreateFunction.functionName().toUpperCase(), + ScalarFunctionImpl.create(Class.forName(sqlCreateFunction.className()), "evaluate")); + hasUdf = true; + } + private void handleCreateTableForTrident( SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) { List<FieldInfo> fields = updateSchema(n); @@ -184,4 +201,18 @@ class StormSqlImpl extends StormSql { schema.add(n.tableName(), table); return fields; } + + private FrameworkConfig buildFrameWorkConfig() { + if (hasUdf) { + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), + false, + Collections.<String>emptyList(), typeFactory)); + return Frameworks.newConfigBuilder().defaultSchema(schema) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build(); + } else { + return Frameworks.newConfigBuilder().defaultSchema(schema).build(); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java new file mode 100644 index 0000000..5dcd7d1 --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java @@ -0,0 +1,68 @@ +package org.apache.storm.sql.parser; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; + +public class SqlCreateFunction extends SqlCall { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator( + "CREATE_FUNCTION", SqlKind.OTHER) { + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) { + assert functionQualifier == null; + return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1]); + } + + @Override + public void unparse( + SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + SqlCreateFunction t = (SqlCreateFunction) call; + UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec); + u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className); + } + }; + + private final SqlIdentifier functionName; + private final SqlNode className; + + public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className) { + super(pos); + this.functionName = functionName; + this.className = className; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(functionName, className); + } + + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + getOperator().unparse(writer, this, leftPrec, rightPrec); + } + + public String functionName() { + return functionName.toString(); + } + + public String className() { + return ((NlsString)SqlLiteral.value(className)).getValue(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java index a85a907..ce1e27f 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java @@ -18,8 +18,13 @@ package org.apache.storm.sql; import com.google.common.collect.ImmutableMap; +import org.apache.storm.sql.runtime.ChannelHandler; +import org.apache.storm.sql.runtime.DataSource; +import org.apache.storm.sql.runtime.DataSourcesProvider; +import org.apache.storm.sql.runtime.DataSourcesRegistry; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.tuple.Values; -import org.apache.storm.sql.runtime.*; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -129,4 +134,20 @@ public class TestStormSql { sql.execute(stmt, h); Assert.assertEquals(0, values.size()); } + + @Test + public void testExternalUdf() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'"); + stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'"); + stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(2, values.size()); + Assert.assertEquals(4, values.get(0).get(0)); + Assert.assertEquals(5, values.get(1).get(0)); + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java index b957565..68054d8 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java @@ -41,6 +41,12 @@ public class TestSqlParser { parse(sql); } + @Test + public void testCreateFunction() throws Exception { + String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'"; + parse(sql); + } + private static SqlNode parse(String sql) throws Exception { StormParser parser = new StormParser(sql); return parser.impl().parseSqlStmtEof(); http://git-wip-us.apache.org/repos/asf/storm/blob/0047279a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index da763a7..5091e3a 100644 --- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -41,6 +41,12 @@ import java.util.List; import java.util.Map; public class TestUtils { + public static class MyPlus { + public static Integer evaluate(Integer x, Integer y) { + return x + y; + } + } + public static class MockDataSource implements DataSource { private final ArrayList<Values> RECORDS = new ArrayList<>();
