Repository: storm Updated Branches: refs/heads/1.x-branch a8522bed5 -> ddc3c0458
Merge branch 'STORM-1585' of https://github.com/arunmahadevan/storm into STORM-1585 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/291bad33 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/291bad33 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/291bad33 Branch: refs/heads/1.x-branch Commit: 291bad3391f2a403fc796e225fe6a085a0be9dd8 Parents: a8522be Author: Sriharsha Chintalapani <[email protected]> Authored: Thu Apr 14 08:20:42 2016 -0700 Committer: Sriharsha Chintalapani <[email protected]> Committed: Thu Apr 14 10:29:30 2016 -0700 ---------------------------------------------------------------------- .../storm-sql-core/src/codegen/data/Parser.tdd | 4 +- .../src/codegen/includes/parserImpls.ftl | 25 ++++++ .../jvm/org/apache/storm/sql/StormSqlImpl.java | 44 +++++++-- .../apache/storm/sql/compiler/ExprCompiler.java | 26 +++++- .../storm/sql/parser/SqlCreateFunction.java | 94 ++++++++++++++++++++ .../test/org/apache/storm/sql/TestStormSql.java | 67 +++++++++++++- .../storm/sql/compiler/TestCompilerUtils.java | 15 +++- .../backends/standalone/TestPlanCompiler.java | 16 ++++ .../apache/storm/sql/parser/TestSqlParser.java | 6 ++ .../test/org/apache/storm/sql/TestUtils.java | 6 ++ 10 files changed, 289 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/codegen/data/Parser.tdd ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd index db3a675..1e921c7 100644 --- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd +++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd @@ -34,11 +34,13 @@ "OUTPUTFORMAT", "STORED", "TBLPROPERTIES", + "JAR" ] # List of methods for parsing custom SQL statements. statementParserMethods: [ - "SqlCreateTable()" + "SqlCreateTable()", + "SqlCreateFunction()" ] # List of methods for parsing custom literals. http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl index 72a8546..0013231 100644 --- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl +++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl @@ -83,4 +83,29 @@ SqlNode SqlCreateTable() : input_format_class_name, output_format_class_name, location, tbl_properties, select); } +} + +/** + * CREATE FUNCTION functionname AS 'classname' + */ +SqlNode SqlCreateFunction() : +{ + SqlParserPos pos; + SqlIdentifier functionName; + SqlNode className; + SqlNode jarName = null; +} +{ + <CREATE> { pos = getPos(); } + <FUNCTION> + functionName = CompoundIdentifier() + <AS> + className = StringLiteral() + [ + <USING> <JAR> + jarName = StringLiteral() + ] + { + return new SqlCreateFunction(pos, functionName, className, jarName); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index 7e5dfcc..bf5a921 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -17,6 +17,12 @@ */ package org.apache.storm.sql; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.SubmitOptions; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -34,6 +40,7 @@ import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler; import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.parser.ColumnConstraint; import org.apache.storm.sql.parser.ColumnDefinition; +import org.apache.storm.sql.parser.SqlCreateFunction; import org.apache.storm.sql.parser.SqlCreateTable; import org.apache.storm.sql.parser.StormParser; import org.apache.storm.sql.runtime.*; @@ -47,6 +54,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +69,7 @@ class StormSqlImpl extends StormSql { private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT); private final SchemaPlus schema = Frameworks.createRootSchema(true); + private boolean hasUdf = false; @Override public void execute( @@ -72,9 +81,10 @@ class StormSqlImpl extends StormSql { SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { handleCreateTable((SqlCreateTable) node, dataSources); + } else if (node instanceof SqlCreateFunction) { + handleCreateFunction((SqlCreateFunction) node); } else { - FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + FrameworkConfig config = buildFrameWorkConfig(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); @@ -97,9 +107,10 @@ class StormSqlImpl extends StormSql { SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { handleCreateTableForTrident((SqlCreateTable) node, dataSources); - } else { - FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + } else if (node instanceof SqlCreateFunction) { + handleCreateFunction((SqlCreateFunction) node); + } else { + FrameworkConfig config = buildFrameWorkConfig(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); @@ -153,6 +164,15 @@ class StormSqlImpl extends StormSql { dataSources.put(n.tableName(), ds); } + private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException { + if(sqlCreateFunction.jarName() != null) { + throw new UnsupportedOperationException("UDF 'USING JAR' not implemented"); + } + schema.add(sqlCreateFunction.functionName().toUpperCase(), + ScalarFunctionImpl.create(Class.forName(sqlCreateFunction.className()), "evaluate")); + hasUdf = true; + } + private void handleCreateTableForTrident( SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) { List<FieldInfo> fields = updateSchema(n); @@ -184,4 +204,18 @@ class StormSqlImpl extends StormSql { schema.add(n.tableName(), table); return fields; } + + private FrameworkConfig buildFrameWorkConfig() { + if (hasUdf) { + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), + false, + Collections.<String>emptyList(), typeFactory)); + return Frameworks.newConfigBuilder().defaultSchema(schema) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build(); + } else { + return Frameworks.newConfigBuilder().defaultSchema(schema).build(); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java index c43c32f..df0c27f 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java @@ -208,9 +208,22 @@ public class ExprCompiler implements RexVisitor<String> { this.translators = builder.build(); } + private CallExprPrinter getCallExprPrinter(SqlOperator op) { + if (translators.containsKey(op)) { + return translators.get(op); + } else if (op instanceof SqlUserDefinedFunction) { + Function function = ((SqlUserDefinedFunction) op).getFunction(); + if (function instanceof ReflectiveFunctionBase) { + Method method = ((ReflectiveFunctionBase) function).method; + return methodCall(op, method, NullPolicy.STRICT).getValue(); + } + } + return null; + } + private String compile(ExprCompiler compiler, RexCall call) { SqlOperator op = call.getOperator(); - CallExprPrinter printer = translators.get(op); + CallExprPrinter printer = getCallExprPrinter(op); if (printer == null) { throw new UnsupportedOperationException(); } else { @@ -218,8 +231,8 @@ public class ExprCompiler implements RexVisitor<String> { } } - private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod( - final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) { + private Map.Entry<SqlOperator, CallExprPrinter> methodCall( + final SqlOperator op, final Method method, NullPolicy nullPolicy) { if (nullPolicy != NullPolicy.STRICT) { throw new UnsupportedOperationException(); } @@ -240,7 +253,7 @@ public class ExprCompiler implements RexVisitor<String> { pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg)); } } - String calc = printMethodCall(method.method, args); + String calc = printMethodCall(method, args); pw.print(String.format("else { %1$s = %2$s; }\n", val, calc)); return val; } @@ -248,6 +261,11 @@ public class ExprCompiler implements RexVisitor<String> { return new AbstractMap.SimpleImmutableEntry<>(op, printer); } + private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod( + final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) { + return methodCall(op, method.method, nullPolicy); + } + private Map.Entry<SqlOperator, CallExprPrinter> infixBinary (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) { CallExprPrinter trans = new CallExprPrinter() { http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java new file mode 100644 index 0000000..a53802c --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.parser; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; + +public class SqlCreateFunction extends SqlCall { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator( + "CREATE_FUNCTION", SqlKind.OTHER) { + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) { + assert functionQualifier == null; + return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], o[2]); + } + + @Override + public void unparse( + SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + SqlCreateFunction t = (SqlCreateFunction) call; + UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec); + u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className); + if (t.jarName != null) { + u.keyword("USING", "JAR").node(t.jarName); + } + } + }; + + private final SqlIdentifier functionName; + private final SqlNode className; + private final SqlNode jarName; + + public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className, SqlNode jarName) { + super(pos); + this.functionName = functionName; + this.className = className; + this.jarName = jarName; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(functionName, className); + } + + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + getOperator().unparse(writer, this, leftPrec, rightPrec); + } + + public String functionName() { + return functionName.toString(); + } + + public String className() { + return ((NlsString)SqlLiteral.value(className)).getValue(); + } + + public String jarName() { + return jarName == null ? null : ((NlsString)SqlLiteral.value(jarName)).getValue(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java index a85a907..872b83d 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,8 +18,15 @@ package org.apache.storm.sql; import com.google.common.collect.ImmutableMap; +import org.apache.calcite.sql.validate.SqlValidatorException; +import org.apache.calcite.tools.ValidationException; +import org.apache.storm.sql.runtime.ChannelHandler; +import org.apache.storm.sql.runtime.DataSource; +import org.apache.storm.sql.runtime.DataSourcesProvider; +import org.apache.storm.sql.runtime.DataSourcesRegistry; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.tuple.Values; -import org.apache.storm.sql.runtime.*; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -129,4 +136,58 @@ public class TestStormSql { sql.execute(stmt, h); Assert.assertEquals(0, values.size()); } + + @Test(expected = ValidationException.class) + public void testExternalUdfType() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'"); + stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'"); + stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + System.out.println(values); + + } + + @Test + public void testExternalUdfType2() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'"); + stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'"); + stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(0, values.size()); + } + + @Test + public void testExternalUdf() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'"); + stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'"); + stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(2, values.size()); + Assert.assertEquals(4, values.get(0).get(0)); + Assert.assertEquals(5, values.get(1).get(0)); + } + + @Test(expected = UnsupportedOperationException.class) + public void testExternalUdfUsingJar() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'"); + stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'"); + stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java index 43b54f7..8a14eee 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java @@ -46,6 +46,12 @@ import java.util.List; public class TestCompilerUtils { + public static class MyPlus { + public static Integer eval(Integer x, Integer y) { + return x + y; + } + } + public static CalciteState sqlOverDummyTable(String sql) throws RelConversionException, ValidationException, SqlParseException { SchemaPlus schema = Frameworks.createRootSchema(true); @@ -83,8 +89,15 @@ public class TestCompilerUtils { Table table = streamableTable.stream(); schema.add("FOO", table); schema.add("BAR", table); + schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval")); + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), + false, + Collections.<String>emptyList(), typeFactory)); + SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables); FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema( - schema).build(); + schema).operatorTable(chainedSqlOperatorTable).build(); Planner planner = Frameworks.getPlanner(config); SqlNode parse = planner.parse(sql); SqlNode validate = planner.validate(parse); http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java index 414aeee..547114f 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java @@ -86,4 +86,20 @@ public class TestPlanCompiler { Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0)); } + + @Test + public void testUdf() throws Exception { + String sql = "SELECT MYPLUS(ID, 3)" + + "FROM FOO " + + "WHERE ID = 2"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); + PlanCompiler compiler = new PlanCompiler(typeFactory); + AbstractValuesProcessor proc = compiler.compile(state.tree()); + Map<String, DataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockDataSource()); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + proc.initialize(data, h); + Assert.assertEquals(new Values(5), values.get(0)); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java index b957565..68054d8 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java @@ -41,6 +41,12 @@ public class TestSqlParser { parse(sql); } + @Test + public void testCreateFunction() throws Exception { + String sql = "CREATE FUNCTION foo AS 'org.apache.storm.sql.MyUDF'"; + parse(sql); + } + private static SqlNode parse(String sql) throws Exception { StormParser parser = new StormParser(sql); return parser.impl().parseSqlStmtEof(); http://git-wip-us.apache.org/repos/asf/storm/blob/291bad33/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index da763a7..5091e3a 100644 --- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -41,6 +41,12 @@ import java.util.List; import java.util.Map; public class TestUtils { + public static class MyPlus { + public static Integer evaluate(Integer x, Integer y) { + return x + y; + } + } + public static class MockDataSource implements DataSource { private final ArrayList<Values> RECORDS = new ArrayList<>();
