STORM-2845 Drop standalone mode of Storm SQL * remove all related interfaces/classes on standalone mode * migrate tests to trident mode which are associated to standalone mode * remove comments from tests which Calcite has fixed the issues
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1dcd12de Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1dcd12de Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1dcd12de Branch: refs/heads/master Commit: 1dcd12de27b813a8c3e42d600028fa97df0a5f15 Parents: 4495f66 Author: Jungtaek Lim <[email protected]> Authored: Wed Dec 6 19:52:40 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Dec 7 09:59:28 2017 +0900 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/sql/StormSql.java | 8 - .../org/apache/storm/sql/StormSqlContext.java | 172 +++++ .../jvm/org/apache/storm/sql/StormSqlImpl.java | 180 +---- .../standalone/BuiltinAggregateFunctions.java | 238 ------- .../backends/standalone/PlanCompiler.java | 139 ---- .../standalone/PostOrderRelNodeVisitor.java | 132 ---- .../backends/standalone/RelNodeCompiler.java | 484 ------------- .../test/org/apache/storm/sql/SqlTestUtil.java | 65 ++ .../storm/sql/StormSqlLocalClusterImpl.java | 80 +++ .../test/org/apache/storm/sql/TestStormSql.java | 676 +++++++------------ .../storm/sql/compiler/TestExprSemantic.java | 410 ----------- .../backends/standalone/TestCompilerUtils.java | 183 ----- .../backends/standalone/TestPlanCompiler.java | 104 --- .../standalone/TestRelNodeCompiler.java | 64 -- .../backends/trident/TestExpressions.java | 359 ++++++++++ .../backends/trident/TestPlanCompiler.java | 119 ++-- .../storm/sql/hdfs/HdfsDataSourcesProvider.java | 7 - .../sql/kafka/KafkaDataSourcesProvider.java | 7 - .../sql/mongodb/MongoDataSourcesProvider.java | 7 - .../sql/redis/RedisDataSourcesProvider.java | 6 - .../sql/runtime/AbstractChannelHandler.java | 53 -- .../sql/runtime/AbstractValuesProcessor.java | 45 -- .../storm/sql/runtime/ChannelContext.java | 34 - .../storm/sql/runtime/ChannelHandler.java | 42 -- .../org/apache/storm/sql/runtime/Channels.java | 110 --- .../apache/storm/sql/runtime/DataSource.java | 28 - .../storm/sql/runtime/DataSourcesProvider.java | 14 - .../storm/sql/runtime/DataSourcesRegistry.java | 19 - .../storm/sql/runtime/StormSqlFunctions.java | 48 -- .../socket/SocketDataSourcesProvider.java | 6 - .../test/org/apache/storm/sql/TestUtils.java | 190 ++---- 31 files changed, 1048 insertions(+), 2981 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java index 3fbc463..49e4162 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java @@ -19,7 +19,6 @@ package org.apache.storm.sql; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.SubmitOptions; -import org.apache.storm.sql.runtime.ChannelHandler; import java.util.Map; @@ -33,13 +32,6 @@ import java.util.Map; */ public abstract class StormSql { /** - * Execute the SQL statements in stand-alone mode. The user can retrieve the result by passing in an instance - * of {@see ChannelHandler}. - */ - public abstract void execute(Iterable<String> statements, - ChannelHandler handler) throws Exception; - - /** * Submit the SQL statements to Nimbus and run it as a topology. */ public abstract void submit( http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java new file mode 100644 index 0000000..bf7ef3e --- /dev/null +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AggregateFunctionImpl; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.apache.storm.sql.compiler.CompilerUtil; +import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl; +import org.apache.storm.sql.parser.ColumnConstraint; +import org.apache.storm.sql.parser.ColumnDefinition; +import org.apache.storm.sql.parser.SqlCreateFunction; +import org.apache.storm.sql.parser.SqlCreateTable; +import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.trident.QueryPlanner; +import org.apache.storm.sql.runtime.DataSourcesRegistry; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; + +public class StormSqlContext { + private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + private final SchemaPlus schema = Frameworks.createRootSchema(true); + private boolean hasUdf = false; + private Map<String, ISqlTridentDataSource> dataSources = new HashMap<>(); + + public void interpretCreateTable(SqlCreateTable n) { + CompilerUtil.TableBuilderInfo builder = new CompilerUtil.TableBuilderInfo(typeFactory); + List<FieldInfo> fields = new ArrayList<>(); + for (ColumnDefinition col : n.fieldList()) { + builder.field(col.name(), col.type(), col.constraint()); + RelDataType dataType = col.type().deriveType(typeFactory); + Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType); + ColumnConstraint constraint = col.constraint(); + boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey; + fields.add(new FieldInfo(col.name(), javaType, isPrimary)); + } + + if (n.parallelism() != null) { + builder.parallelismHint(n.parallelism()); + } + Table table = builder.build(); + schema.add(n.tableName(), table); + + ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n + .inputFormatClass(), n.outputFormatClass(), n.properties(), fields); + if (ds == null) { + throw new RuntimeException("Failed to find data source for " + n + .tableName() + " URI: " + n.location()); + } else if (dataSources.containsKey(n.tableName())) { + throw new RuntimeException("Duplicated definition for table " + n + .tableName()); + } + dataSources.put(n.tableName(), ds); + } + + public void interpretCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException { + if(sqlCreateFunction.jarName() != null) { + throw new UnsupportedOperationException("UDF 'USING JAR' not implemented"); + } + Method method; + Function function; + if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) { + function = ScalarFunctionImpl.create(method); + } else if (findMethod(sqlCreateFunction.className(), "add") != null) { + function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className())); + } else { + throw new RuntimeException("Invalid scalar or aggregate function"); + } + schema.add(sqlCreateFunction.functionName().toUpperCase(), function); + hasUdf = true; + } + + public AbstractTridentProcessor compileSql(String query) throws Exception { + QueryPlanner planner = new QueryPlanner(schema); + return planner.compile(dataSources, query); + } + + public String explain(String query) throws SqlParseException, ValidationException, RelConversionException { + FrameworkConfig config = buildFrameWorkConfig(); + Planner planner = Frameworks.getPlanner(config); + SqlNode parse = planner.parse(query); + SqlNode validate = planner.validate(parse); + RelNode tree = planner.convert(validate); + + return StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES); + } + + public FrameworkConfig buildFrameWorkConfig() { + if (hasUdf) { + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), + false, + Collections.<String>emptyList(), typeFactory)); + return Frameworks.newConfigBuilder().defaultSchema(schema) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build(); + } else { + return Frameworks.newConfigBuilder().defaultSchema(schema).build(); + } + } + + public JavaTypeFactory getTypeFactory() { + return typeFactory; + } + + public SchemaPlus getSchema() { + return schema; + } + + public boolean isHasUdf() { + return hasUdf; + } + + public Map<String, ISqlTridentDataSource> getDataSources() { + return dataSources; + } + + private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException { + Class<?> clazz = Class.forName(clazzName); + for (Method method : clazz.getMethods()) { + if (method.getName().equals(methodName)) { + return method; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index 1c83ac8..a771793 100644 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -17,55 +17,21 @@ */ package org.apache.storm.sql; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.Function; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AggregateFunctionImpl; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.SubmitOptions; -import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl; -import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler; import org.apache.storm.sql.javac.CompilingClassLoader; -import org.apache.storm.sql.parser.ColumnConstraint; -import org.apache.storm.sql.parser.ColumnDefinition; import org.apache.storm.sql.parser.SqlCreateFunction; import org.apache.storm.sql.parser.SqlCreateTable; import org.apache.storm.sql.parser.StormParser; -import org.apache.storm.sql.planner.StormRelUtils; -import org.apache.storm.sql.planner.trident.QueryPlanner; -import org.apache.storm.sql.runtime.AbstractValuesProcessor; -import org.apache.storm.sql.runtime.ChannelHandler; -import org.apache.storm.sql.runtime.DataSource; -import org.apache.storm.sql.runtime.DataSourcesRegistry; -import org.apache.storm.sql.runtime.FieldInfo; -import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.trident.TridentTopology; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.jar.Attributes; @@ -73,37 +39,11 @@ import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.zip.ZipEntry; -import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo; - class StormSqlImpl extends StormSql { - private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl( - RelDataTypeSystem.DEFAULT); - private final SchemaPlus schema = Frameworks.createRootSchema(true); - private boolean hasUdf = false; + private final StormSqlContext sqlContext; - @Override - public void execute( - Iterable<String> statements, ChannelHandler result) - throws Exception { - Map<String, DataSource> dataSources = new HashMap<>(); - for (String sql : statements) { - StormParser parser = new StormParser(sql); - SqlNode node = parser.impl().parseSqlStmtEof(); - if (node instanceof SqlCreateTable) { - handleCreateTable((SqlCreateTable) node, dataSources); - } else if (node instanceof SqlCreateFunction) { - handleCreateFunction((SqlCreateFunction) node); - } else { - FrameworkConfig config = buildFrameWorkConfig(); - Planner planner = Frameworks.getPlanner(config); - SqlNode parse = planner.parse(sql); - SqlNode validate = planner.validate(parse); - RelNode tree = planner.convert(validate); - PlanCompiler compiler = new PlanCompiler(typeFactory); - AbstractValuesProcessor proc = compiler.compile(tree); - proc.initialize(dataSources, result); - } - } + public StormSqlImpl() { + sqlContext = new StormSqlContext(); } @Override @@ -111,17 +51,15 @@ class StormSqlImpl extends StormSql { String name, Iterable<String> statements, Map<String, Object> topoConf, SubmitOptions opts, StormSubmitter.ProgressListener progressListener, String asUser) throws Exception { - Map<String, ISqlTridentDataSource> dataSources = new HashMap<>(); for (String sql : statements) { StormParser parser = new StormParser(sql); SqlNode node = parser.impl().parseSqlStmtEof(); if (node instanceof SqlCreateTable) { - handleCreateTableForTrident((SqlCreateTable) node, dataSources); + sqlContext.interpretCreateTable((SqlCreateTable) node); } else if (node instanceof SqlCreateFunction) { - handleCreateFunction((SqlCreateFunction) node); - } else { - QueryPlanner planner = new QueryPlanner(schema); - AbstractTridentProcessor processor = planner.compile(dataSources, sql); + sqlContext.interpretCreateFunction((SqlCreateFunction) node); + } else { + AbstractTridentProcessor processor = sqlContext.compileSql(sql); TridentTopology topo = processor.build(); Path jarPath = null; @@ -145,7 +83,6 @@ class StormSqlImpl extends StormSql { @Override public void explain(Iterable<String> statements) throws Exception { - Map<String, ISqlTridentDataSource> dataSources = new HashMap<>(); for (String sql : statements) { StormParser parser = new StormParser(sql); SqlNode node = parser.impl().parseSqlStmtEof(); @@ -156,19 +93,13 @@ class StormSqlImpl extends StormSql { System.out.println("-----------------------------------------------------------"); if (node instanceof SqlCreateTable) { - handleCreateTableForTrident((SqlCreateTable) node, dataSources); + sqlContext.interpretCreateTable((SqlCreateTable) node); System.out.println("No plan presented on DDL"); } else if (node instanceof SqlCreateFunction) { - handleCreateFunction((SqlCreateFunction) node); + sqlContext.interpretCreateFunction((SqlCreateFunction) node); System.out.println("No plan presented on DDL"); } else { - FrameworkConfig config = buildFrameWorkConfig(); - Planner planner = Frameworks.getPlanner(config); - SqlNode parse = planner.parse(sql); - SqlNode validate = planner.validate(parse); - RelNode tree = planner.convert(validate); - - String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES); + String plan = sqlContext.explain(sql); System.out.println("plan>"); System.out.println(plan); } @@ -196,95 +127,4 @@ class StormSqlImpl extends StormSql { } } } - - private void handleCreateTable( - SqlCreateTable n, Map<String, DataSource> dataSources) { - List<FieldInfo> fields = updateSchema(n); - DataSource ds = DataSourcesRegistry.construct(n.location(), n - .inputFormatClass(), n.outputFormatClass(), fields); - if (ds == null) { - throw new RuntimeException("Cannot construct data source for " + n - .tableName()); - } else if (dataSources.containsKey(n.tableName())) { - throw new RuntimeException("Duplicated definition for table " + n - .tableName()); - } - dataSources.put(n.tableName(), ds); - } - - private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException { - if(sqlCreateFunction.jarName() != null) { - throw new UnsupportedOperationException("UDF 'USING JAR' not implemented"); - } - Method method; - Function function; - if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) { - function = ScalarFunctionImpl.create(method); - } else if (findMethod(sqlCreateFunction.className(), "add") != null) { - function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className())); - } else { - throw new RuntimeException("Invalid scalar or aggregate function"); - } - schema.add(sqlCreateFunction.functionName().toUpperCase(), function); - hasUdf = true; - } - - private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException { - Class<?> clazz = Class.forName(clazzName); - for (Method method : clazz.getMethods()) { - if (method.getName().equals(methodName)) { - return method; - } - } - return null; - } - - private void handleCreateTableForTrident( - SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) { - List<FieldInfo> fields = updateSchema(n); - ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n - .inputFormatClass(), n.outputFormatClass(), n.properties(), fields); - if (ds == null) { - throw new RuntimeException("Failed to find data source for " + n - .tableName() + " URI: " + n.location()); - } else if (dataSources.containsKey(n.tableName())) { - throw new RuntimeException("Duplicated definition for table " + n - .tableName()); - } - dataSources.put(n.tableName(), ds); - } - - private List<FieldInfo> updateSchema(SqlCreateTable n) { - TableBuilderInfo builder = new TableBuilderInfo(typeFactory); - List<FieldInfo> fields = new ArrayList<>(); - for (ColumnDefinition col : n.fieldList()) { - builder.field(col.name(), col.type(), col.constraint()); - RelDataType dataType = col.type().deriveType(typeFactory); - Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType); - ColumnConstraint constraint = col.constraint(); - boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey; - fields.add(new FieldInfo(col.name(), javaType, isPrimary)); - } - - if (n.parallelism() != null) { - builder.parallelismHint(n.parallelism()); - } - Table table = builder.build(); - schema.add(n.tableName(), table); - return fields; - } - - private FrameworkConfig buildFrameWorkConfig() { - if (hasUdf) { - List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); - sqlOperatorTables.add(SqlStdOperatorTable.instance()); - sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), - false, - Collections.<String>emptyList(), typeFactory)); - return Frameworks.newConfigBuilder().defaultSchema(schema) - .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build(); - } else { - return Frameworks.newConfigBuilder().defaultSchema(schema).build(); - } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java deleted file mode 100644 index 9dc4ba8..0000000 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java +++ /dev/null @@ -1,238 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler.backends.standalone; - -import com.google.common.collect.ImmutableList; -import org.apache.storm.tuple.Values; - -import java.lang.reflect.Type; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Built-in implementations for some of the standard aggregation operations. - * Aggregations can be implemented as a class with the following methods viz. init, add and result. - * The class could contain only static methods, only non-static methods or be generic. - */ -public class BuiltinAggregateFunctions { - // binds the type information and the class implementing the aggregation - public static class TypeClass { - public static class GenericType { - } - - public final Type ty; - public final Class<?> clazz; - - private TypeClass(Type ty, Class<?> clazz) { - this.ty = ty; - this.clazz = clazz; - } - - static TypeClass of(Type ty, Class<?> clazz) { - return new TypeClass(ty, clazz); - } - } - - static final Map<String, List<TypeClass>> TABLE = new HashMap<>(); - - public static class ByteSum { - public static Byte init() { - return 0; - } - - public static Byte add(Byte accumulator, Byte val) { - return (byte) (accumulator + val); - } - - public static Byte result(Byte accumulator) { - return accumulator; - } - } - - public static class ShortSum { - public static Short init() { - return 0; - } - - public static Short add(Short accumulator, Short val) { - return (short) (accumulator + val); - } - - public static Short result(Short accumulator) { - return accumulator; - } - } - - public static class IntSum { - public static Integer init() { - return 0; - } - - public static Integer add(Integer accumulator, Integer val) { - return accumulator + val; - } - - public static Integer result(Integer accumulator) { - return accumulator; - } - } - - public static class LongSum { - public static Long init() { - return 0L; - } - - public static Long add(Long accumulator, Long val) { - return accumulator + val; - } - - public static Long result(Long accumulator) { - return accumulator; - } - } - - public static class FloatSum { - public static Float init() { - return 0.0f; - } - - public static Float add(Float accumulator, Float val) { - return accumulator + val; - } - - public static Float result(Float accumulator) { - return accumulator; - } - } - - public static class DoubleSum { - public static Double init() { - return 0.0; - } - - public static Double add(Double accumulator, Double val) { - return accumulator + val; - } - - public static Double result(Double accumulator) { - return accumulator; - } - } - - public static class Max<T extends Comparable<T>> { - public T init() { - return null; - } - - public T add(T accumulator, T val) { - return (accumulator == null || accumulator.compareTo(val) < 0) ? val : accumulator; - } - - public T result(T accumulator) { - return accumulator; - } - } - - public static class Min<T extends Comparable<T>> { - public T init() { - return null; - } - - public T add(T accumulator, T val) { - return (accumulator == null || accumulator.compareTo(val) > 0) ? val : accumulator; - } - - public T result(T accumulator) { - return accumulator; - } - } - - public static class IntAvg { - private int count; - - public Integer init() { - return 0; - } - - public Integer add(Integer accumulator, Integer val) { - ++count; - return accumulator + val; - } - - public Integer result(Integer accumulator) { - Integer result = accumulator / count; - count = 0; - return result; - } - } - - public static class DoubleAvg { - private int count; - - public Double init() { - return 0.0; - } - - public Double add(Double accumulator, Double val) { - ++count; - return accumulator + val; - } - - public Double result(Double accumulator) { - Double result = accumulator / count; - count = 0; - return result; - } - } - - public static class Count { - public static Long init() { - return 0L; - } - - public static Long add(Long accumulator, Values vals) { - for (Object val : vals) { - if (val == null) { - return accumulator; - } - } - return accumulator + 1; - } - - public static Long result(Long accumulator) { - return accumulator; - } - } - - static { - TABLE.put("SUM", ImmutableList.of( - TypeClass.of(float.class, FloatSum.class), - TypeClass.of(double.class, DoubleSum.class), - TypeClass.of(byte.class, ByteSum.class), - TypeClass.of(short.class, ShortSum.class), - TypeClass.of(long.class, LongSum.class), - TypeClass.of(int.class, IntSum.class))); - TABLE.put("AVG", ImmutableList.of( - TypeClass.of(double.class, DoubleAvg.class), - TypeClass.of(int.class, IntAvg.class))); - TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, Count.class))); - TABLE.put("MAX", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class))); - TABLE.put("MIN", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class))); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java deleted file mode 100644 index 01546ed..0000000 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler.backends.standalone; - -import com.google.common.base.Joiner; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableScan; -import org.apache.storm.sql.compiler.CompilerUtil; -import org.apache.storm.sql.javac.CompilingClassLoader; -import org.apache.storm.sql.runtime.AbstractValuesProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.HashSet; -import java.util.Set; - -public class PlanCompiler { - private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class); - - private static final Joiner NEW_LINE_JOINER = Joiner.on("\n"); - private static final String PACKAGE_NAME = "org.apache.storm.sql.generated"; - private static final String PROLOGUE = NEW_LINE_JOINER.join( - "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "", - "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;", - "import java.util.List;", "import java.util.ArrayList;", - "import java.util.LinkedHashMap;", - "import org.apache.storm.tuple.Values;", - "import org.apache.storm.sql.runtime.AbstractChannelHandler;", - "import org.apache.storm.sql.runtime.Channels;", - "import org.apache.storm.sql.runtime.ChannelContext;", - "import org.apache.storm.sql.runtime.ChannelHandler;", - "import org.apache.storm.sql.runtime.DataSource;", - "import org.apache.storm.sql.runtime.AbstractValuesProcessor;", - "import com.google.common.collect.ArrayListMultimap;", - "import com.google.common.collect.Multimap;", - "import org.apache.calcite.interpreter.Context;", - "import org.apache.calcite.interpreter.StormContext;", - "import org.apache.calcite.DataContext;", - "import org.apache.storm.sql.runtime.calcite.StormDataContext;", - "public final class Processor extends AbstractValuesProcessor {", - " public final static DataContext dataContext = new StormDataContext();", - ""); - private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join( - " @Override", - " public void initialize(Map<String, DataSource> data,", - " ChannelHandler result) {", - " ChannelContext r = Channels.chain(Channels.voidContext(), result);", - "" - ); - - private final JavaTypeFactory typeFactory; - - public PlanCompiler(JavaTypeFactory typeFactory) { - this.typeFactory = typeFactory; - } - - private String generateJavaSource(RelNode root) throws Exception { - StringWriter sw = new StringWriter(); - try (PrintWriter pw = new PrintWriter(sw)) { - RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory); - printPrologue(pw); - compiler.traverse(root); - printMain(pw, root); - printEpilogue(pw); - } - return sw.toString(); - } - - private void printMain(PrintWriter pw, RelNode root) { - Set<TableScan> tables = new HashSet<>(); - pw.print(INITIALIZER_PROLOGUE); - chainOperators(pw, root, tables); - for (TableScan n : tables) { - String escaped = CompilerUtil.escapeJavaString( - Joiner.on('.').join(n.getTable().getQualifiedName()), true); - String r = NEW_LINE_JOINER.join( - " if (!data.containsKey(%1$s))", - " throw new RuntimeException(\"Cannot find table \" + %1$s);", - " data.get(%1$s).open(CTX_%2$d);", - ""); - pw.print(String.format(r, escaped, n.getId())); - } - pw.print(" }\n"); - } - - private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> tables) { - doChainOperators(pw, root, tables, "r"); - } - - private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> tables, String parentCtx) { - pw.print( - String.format(" ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n", - node.getId(), parentCtx, RelNodeCompiler.getStageName(node))); - String currentCtx = String.format("CTX_%d", node.getId()); - if (node instanceof TableScan) { - tables.add((TableScan) node); - } - for (RelNode i : node.getInputs()) { - doChainOperators(pw, i, tables, currentCtx); - } - } - - public AbstractValuesProcessor compile(RelNode plan) throws Exception { - String javaCode = generateJavaSource(plan); - LOG.debug("Compiling... source code {}", javaCode); - ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(), - PACKAGE_NAME + ".Processor", - javaCode, null); - return (AbstractValuesProcessor) cl.loadClass( - PACKAGE_NAME + ".Processor").newInstance(); - } - - private static void printEpilogue( - PrintWriter pw) throws Exception { - pw.print("}\n"); - } - - private static void printPrologue(PrintWriter pw) { - pw.append(PROLOGUE); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java deleted file mode 100644 index afed8a9..0000000 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler.backends.standalone; - -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.*; -import org.apache.calcite.rel.stream.Delta; - -import java.util.ArrayList; -import java.util.List; - -public abstract class PostOrderRelNodeVisitor<T> { - public final T traverse(RelNode n) throws Exception { - List<T> inputStreams = new ArrayList<>(); - for (RelNode input : n.getInputs()) { - inputStreams.add(traverse(input)); - } - - if (n instanceof Aggregate) { - return visitAggregate((Aggregate) n, inputStreams); - } else if (n instanceof Calc) { - return visitCalc((Calc) n, inputStreams); - } else if (n instanceof Collect) { - return visitCollect((Collect) n, inputStreams); - } else if (n instanceof Correlate) { - return visitCorrelate((Correlate) n, inputStreams); - } else if (n instanceof Delta) { - return visitDelta((Delta) n, inputStreams); - } else if (n instanceof Exchange) { - return visitExchange((Exchange) n, inputStreams); - } else if (n instanceof Project) { - return visitProject((Project) n, inputStreams); - } else if (n instanceof Filter) { - return visitFilter((Filter) n, inputStreams); - } else if (n instanceof Sample) { - return visitSample((Sample) n, inputStreams); - } else if (n instanceof Sort) { - return visitSort((Sort) n, inputStreams); - } else if (n instanceof TableModify) { - return visitTableModify((TableModify) n, inputStreams); - } else if (n instanceof TableScan) { - return visitTableScan((TableScan) n, inputStreams); - } else if (n instanceof Uncollect) { - return visitUncollect((Uncollect) n, inputStreams); - } else if (n instanceof Window) { - return visitWindow((Window) n, inputStreams); - } else if (n instanceof Join) { - return visitJoin((Join) n, inputStreams); - } else { - return defaultValue(n, inputStreams); - } - } - - public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception { - return defaultValue(aggregate, inputStreams); - } - - public T visitCalc(Calc calc, List<T> inputStreams) throws Exception { - return defaultValue(calc, inputStreams); - } - - public T visitCollect(Collect collect, List<T> inputStreams) throws Exception { - return defaultValue(collect, inputStreams); - } - - public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception { - return defaultValue(correlate, inputStreams); - } - - public T visitDelta(Delta delta, List<T> inputStreams) throws Exception { - return defaultValue(delta, inputStreams); - } - - public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception { - return defaultValue(exchange, inputStreams); - } - - public T visitProject(Project project, List<T> inputStreams) throws Exception { - return defaultValue(project, inputStreams); - } - - public T visitFilter(Filter filter, List<T> inputStreams) throws Exception { - return defaultValue(filter, inputStreams); - } - - public T visitSample(Sample sample, List<T> inputStreams) throws Exception { - return defaultValue(sample, inputStreams); - } - - public T visitSort(Sort sort, List<T> inputStreams) throws Exception { - return defaultValue(sort, inputStreams); - } - - public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception { - return defaultValue(modify, inputStreams); - } - - public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception { - return defaultValue(scan, inputStreams); - } - - public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception { - return defaultValue(uncollect, inputStreams); - } - - public T visitWindow(Window window, List<T> inputStreams) throws Exception { - return defaultValue(window, inputStreams); - } - - public T visitJoin(Join join, List<T> inputStreams) throws Exception { - return defaultValue(join, inputStreams); - } - - public T defaultValue(RelNode n, List<T> inputStreams) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java deleted file mode 100644 index 97995c7..0000000 --- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java +++ /dev/null @@ -1,484 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.sql.compiler.backends.standalone; - -import com.google.common.base.Joiner; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalJoin; -import org.apache.calcite.rel.stream.Delta; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.AggregateFunction; -import org.apache.calcite.schema.impl.AggregateFunctionImpl; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; -import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Compile RelNodes into individual functions. - */ -class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { - public static Joiner NEW_LINE_JOINER = Joiner.on('\n'); - - private final PrintWriter pw; - private final JavaTypeFactory typeFactory; - private final RexNodeToJavaCodeCompiler rexCompiler; - - private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join( - " private static final ChannelHandler %1$s = ", - " new AbstractChannelHandler() {", - " @Override", - " public void dataReceived(ChannelContext ctx, Values _data) {", - "" - ); - - private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join( - " private static final ChannelHandler %1$s = ", - " new AbstractChannelHandler() {", - " private final Values EMPTY_VALUES = new Values();", - " private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();", - " private final int[] groupIndices = new int[] {%2$s};", - " private List<Object> getGroupValues(Values _data) {", - " List<Object> res = new ArrayList<>();", - " for (int i: groupIndices) {", - " res.add(_data.get(i));", - " }", - " return res;", - " }", - "", - " @Override", - " public void flush(ChannelContext ctx) {", - " emitAggregateResults(ctx);", - " super.flush(ctx);", - " state.clear();", - " }", - "", - " private void emitAggregateResults(ChannelContext ctx) {", - " for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {", - " List<Object> groupValues = entry.getKey();", - " Map<String, Object> accumulators = entry.getValue();", - " %3$s", - " }", - " }", - "", - " @Override", - " public void dataReceived(ChannelContext ctx, Values _data) {", - "" - ); - - private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join( - " private static final ChannelHandler %1$s = ", - " new AbstractChannelHandler() {", - " Object left = %2$s;", - " Object right = %3$s;", - " Object source = null;", - " List<Values> leftRows = new ArrayList<>();", - " List<Values> rightRows = new ArrayList<>();", - " boolean leftDone = false;", - " boolean rightDone = false;", - " int[] ordinals = new int[] {%4$s, %5$s};", - "", - " Multimap<Object, Values> getJoinTable(List<Values> rows, int joinIndex) {", - " Multimap<Object, Values> m = ArrayListMultimap.create();", - " for(Values v: rows) {", - " m.put(v.get(joinIndex), v);", - " }", - " return m;", - " }", - "", - " List<Values> join(Multimap<Object, Values> tab, List<Values> rows, int rowIdx, boolean rev) {", - " List<Values> res = new ArrayList<>();", - " for (Values row: rows) {", - " for (Values mapValue: tab.get(row.get(rowIdx))) {", - " if (mapValue != null) {", - " Values joinedRow = new Values();", - " if(rev) {", - " joinedRow.addAll(row);", - " joinedRow.addAll(mapValue);", - " } else {", - " joinedRow.addAll(mapValue);", - " joinedRow.addAll(row);", - " }", - " res.add(joinedRow);", - " }", - " }", - " }", - " return res;", - " }", - "", - " @Override", - " public void setSource(ChannelContext ctx, Object source) {", - " this.source = source;", - " }", - "", - " @Override", - " public void flush(ChannelContext ctx) {", - " if (source == left) {", - " leftDone = true;", - " } else if (source == right) {", - " rightDone = true;", - " }", - " if (leftDone && rightDone) {", - " if (leftRows.size() <= rightRows.size()) {", - " for(Values res: join(getJoinTable(leftRows, ordinals[0]), rightRows, ordinals[1], false)) {", - " ctx.emit(res);", - " }", - " } else {", - " for(Values res: join(getJoinTable(rightRows, ordinals[1]), leftRows, ordinals[0], true)) {", - " ctx.emit(res);", - " }", - " }", - " leftDone = rightDone = false;", - " leftRows.clear();", - " rightRows.clear();", - " super.flush(ctx);", - " }", - " }", - "", - " @Override", - " public void dataReceived(ChannelContext ctx, Values _data) {", - "" - ); - - private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join( - " private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;", - ""); - - private static final String STAGE_ENUMERABLE_TABLE_SCAN = NEW_LINE_JOINER.join( - " private static final ChannelHandler %1$s = new AbstractChannelHandler() {", - " @Override", - " public void flush(ChannelContext ctx) {", - " ctx.setSource(this);", - " super.flush(ctx);", - " }", - "", - " @Override", - " public void dataReceived(ChannelContext ctx, Values _data) {", - " ctx.setSource(this);", - " ctx.emit(_data);", - " }", - " };", - ""); - - private int nameCount; - private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>(); - - RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) { - this.pw = pw; - this.typeFactory = typeFactory; - this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory)); - } - - @Override - public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception { - pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta))); - return null; - } - - @Override - public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception { - beginStage(filter); - - List<RexNode> childExps = filter.getChildExps(); - RelDataType inputRowType = filter.getInput(0).getRowType(); - - pw.print("Context context = new StormContext(Processor.dataContext);\n"); - pw.print("context.values = _data.toArray();\n"); - pw.print("Object[] outputValues = new Object[1];\n"); - - pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString()); - - String r = "((Boolean) outputValues[0])"; - if (filter.getCondition().getType().isNullable()) { - pw.print(String.format(" if (%s != null && %s) { ctx.emit(_data); }\n", r, r)); - } else { - pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r, r)); - } - endStage(); - return null; - } - - @Override - public Void visitProject(Project project, List<Void> inputStreams) throws Exception { - beginStage(project); - - List<RexNode> childExps = project.getChildExps(); - RelDataType inputRowType = project.getInput(0).getRowType(); - int outputCount = project.getRowType().getFieldCount(); - - pw.print("Context context = new StormContext(Processor.dataContext);\n"); - pw.print("context.values = _data.toArray();\n"); - pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount)); - - pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString()); - - pw.print(" ctx.emit(new Values(outputValues));\n"); - endStage(); - return null; - } - - @Override - public Void defaultValue(RelNode n, List<Void> inputStreams) { - throw new UnsupportedOperationException(); - } - - @Override - public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception { - pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan))); - return null; - } - - @Override - public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception { - beginAggregateStage(aggregate); - pw.println(" if (_data != null) {"); - pw.println(" List<Object> curGroupValues = getGroupValues(_data);"); - pw.println(" if (!state.containsKey(curGroupValues)) {"); - pw.println(" state.put(curGroupValues, new HashMap<String, Object>());"); - pw.println(" }"); - pw.println(" Map<String, Object> accumulators = state.get(curGroupValues);"); - for (AggregateCall call : aggregate.getAggCallList()) { - aggregate(call); - } - pw.println(" }"); - endStage(); - return null; - } - - @Override - public Void visitJoin(Join join, List<Void> inputStreams) { - beginJoinStage(join); - pw.println(" if (source == left) {"); - pw.println(" leftRows.add(_data);"); - pw.println(" } else if (source == right) {"); - pw.println(" rightRows.add(_data);"); - pw.println(" }"); - endStage(); - return null; - } - - private String groupValueEmitStr(String var, int n) { - int count = 0; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < n; i++) { - if (++count > 1) { - sb.append(", "); - } - sb.append(var).append(".").append("get(").append(i).append(")"); - } - return sb.toString(); - } - - private String emitAggregateStmts(Aggregate aggregate) { - List<String> res = new ArrayList<>(); - StringWriter sw = new StringWriter(); - for (AggregateCall call : aggregate.getAggCallList()) { - res.add(aggregateResult(call, new PrintWriter(sw))); - } - return NEW_LINE_JOINER.join(sw.toString(), - String.format(" ctx.emit(new Values(%s, %s));", - groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()), - Joiner.on(", ").join(res))); - } - - private String aggregateResult(AggregateCall call, PrintWriter pw) { - SqlAggFunction aggFunction = call.getAggregation(); - String aggregationName = call.getAggregation().getName(); - Type ty = typeFactory.getJavaClass(call.getType()); - String result; - if (aggFunction instanceof SqlUserDefinedAggFunction) { - AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function; - result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, pw); - } else { - List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName); - if (typeClasses == null) { - throw new UnsupportedOperationException(aggregationName + " Not implemented"); - } - result = doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)), - reserveAggVarName(call), ty, pw); - } - return result; - } - - private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) { - String resultName = varName + "_result"; - Class<?> accumulatorType = aggFn.accumulatorType; - Class<?> resultType = aggFn.resultType; - List<String> args = new ArrayList<>(); - if (!aggFn.isStatic) { - String aggObjName = String.format("%s_obj", varName); - String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName(); - pw.println(" @SuppressWarnings(\"unchecked\")"); - pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, - aggObjName)); - args.add(aggObjName); - } - args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName)); - pw.println(String.format(" final %s %s = %s;", resultType.getCanonicalName(), - resultName, printMethodCall(aggFn.resultMethod, args))); - - return resultName; - } - - private void aggregate(AggregateCall call) { - SqlAggFunction aggFunction = call.getAggregation(); - String aggregationName = call.getAggregation().getName(); - Type ty = typeFactory.getJavaClass(call.getType()); - if (call.getArgList().size() != 1) { - if (aggregationName.equals("COUNT")) { - if (call.getArgList().size() != 0) { - throw new UnsupportedOperationException("Count with nullable fields"); - } - } - } - if (aggFunction instanceof SqlUserDefinedAggFunction) { - AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function; - doAggregate((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, call.getArgList()); - } else { - List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName); - if (typeClasses == null) { - throw new UnsupportedOperationException(aggregationName + " Not implemented"); - } - doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)), - reserveAggVarName(call), ty, call.getArgList()); - } - } - - private Class<?> findMatchingClass(String aggregationName, List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) { - for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) { - if (typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || typeClass.ty.equals(ty)) { - return typeClass.clazz; - } - } - throw new UnsupportedOperationException(aggregationName + " Not implemeted for type '" + ty + "'"); - } - - private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) { - List<String> args = new ArrayList<>(); - Class<?> accumulatorType = aggFn.accumulatorType; - if (!aggFn.isStatic) { - String aggObjName = String.format("%s_obj", varName); - String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName(); - pw.println(String.format(" if (!accumulators.containsKey(\"%s\")) { ", aggObjName)); - pw.println(String.format(" accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName)); - pw.println(" }"); - pw.println(" @SuppressWarnings(\"unchecked\")"); - pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName, - aggObjName)); - args.add(aggObjName); - } - args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s", - "accumulators.get(\"" + varName + "\")", - printMethodCall(aggFn.initMethod, args), - accumulatorType.getCanonicalName())); - if (argList.isEmpty()) { - args.add("EMPTY_VALUES"); - } else { - for (int i = 0; i < aggFn.valueTypes.size(); i++) { - args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")")); - } - } - pw.print(String.format(" accumulators.put(\"%s\", %s);\n", - varName, - printMethodCall(aggFn.addMethod, args))); - } - - private String reserveAggVarName(AggregateCall call) { - String varName; - if ((varName = aggregateCallVarNames.get(call)) == null) { - varName = call.getAggregation().getName() + ++nameCount; - aggregateCallVarNames.put(call, varName); - } - return varName; - } - - private void beginStage(RelNode n) { - pw.print(String.format(STAGE_PROLOGUE, getStageName(n))); - } - - private void beginAggregateStage(Aggregate n) { - pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), getGroupByIndices(n), emitAggregateStmts(n))); - } - - private void beginJoinStage(Join join) { - int[] ordinals = new int[2]; - if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) { - throw new UnsupportedOperationException("Only simple equi joins are supported"); - } - - pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join), - getStageName(join.getLeft()), - getStageName(join.getRight()), - ordinals[0], - ordinals[1])); - } - - private void endStage() { - pw.print(" }\n };\n"); - } - - static String getStageName(RelNode n) { - return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId(); - } - - private String getGroupByIndices(Aggregate n) { - StringBuilder res = new StringBuilder(); - int count = 0; - for (int i : n.getGroupSet()) { - if (++count > 1) { - res.append(", "); - } - res.append(i); - } - return res.toString(); - } - - public static String printMethodCall(Method method, List<String> args) { - return printMethodCall(method.getDeclaringClass(), method.getName(), - Modifier.isStatic(method.getModifiers()), args); - } - - private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) { - if (isStatic) { - return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args)); - } else { - return String.format("%s.%s(%s)", args.get(0), method, - Joiner.on(',').join(args.subList(1, args.size()))); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java new file mode 100644 index 0000000..5cd4054 --- /dev/null +++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql; + +import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues; + +import java.util.concurrent.Callable; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.sql.javac.CompilingClassLoader; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.utils.Utils; + +public final class SqlTestUtil { + + public static void runTridentTopology(LocalCluster cluster, final int expectedValueSize, AbstractTridentProcessor proc, + TridentTopology topo) throws Exception { + final Config conf = new Config(); + conf.setMaxSpoutPending(20); + + if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) { + CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1); + Utils.setClassLoaderForJavaDeSerialize(lastClassloader); + } + + try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) { + waitForCompletion(1000 * 1000, new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return getCollectedValues().size() < expectedValueSize; + } + }); + } finally { + while(cluster.getClusterInfo().get_topologies_size() > 0) { + Thread.sleep(10); + } + Utils.resetClassLoaderForJavaDeSerialize(); + } + } + + private static void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception { + long start = TestUtils.monotonicNow(); + while (TestUtils.monotonicNow() - start < timeout && cond.call()) { + Thread.sleep(100); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java new file mode 100644 index 0000000..b3acee5 --- /dev/null +++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql; + +import java.util.function.Predicate; + +import org.apache.calcite.sql.SqlNode; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.sql.javac.CompilingClassLoader; +import org.apache.storm.sql.parser.SqlCreateFunction; +import org.apache.storm.sql.parser.SqlCreateTable; +import org.apache.storm.sql.parser.StormParser; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.utils.Utils; + +public class StormSqlLocalClusterImpl { + private final StormSqlContext sqlContext; + + public StormSqlLocalClusterImpl() { + sqlContext = new StormSqlContext(); + } + + public void runLocal(LocalCluster localCluster, Iterable<String> statements, + Predicate<Void> waitCondition, long waitTimeoutMs) throws Exception { + final Config conf = new Config(); + conf.setMaxSpoutPending(20); + + for (String sql : statements) { + StormParser parser = new StormParser(sql); + SqlNode node = parser.impl().parseSqlStmtEof(); + if (node instanceof SqlCreateTable) { + sqlContext.interpretCreateTable((SqlCreateTable) node); + } else if (node instanceof SqlCreateFunction) { + sqlContext.interpretCreateFunction((SqlCreateFunction) node); + } else { + AbstractTridentProcessor processor = sqlContext.compileSql(sql); + TridentTopology topo = processor.build(); + + if (processor.getClassLoaders() != null && processor.getClassLoaders().size() > 0) { + CompilingClassLoader lastClassloader = processor.getClassLoaders().get(processor.getClassLoaders().size() - 1); + Utils.setClassLoaderForJavaDeSerialize(lastClassloader); + } + + try (LocalCluster.LocalTopology stormTopo = localCluster.submitTopology("storm-sql", conf, topo.build())) { + waitForCompletion(waitTimeoutMs, waitCondition); + } finally { + while(localCluster.getClusterInfo().get_topologies_size() > 0) { + Thread.sleep(10); + } + Utils.resetClassLoaderForJavaDeSerialize(); + } + } + } + } + + private static void waitForCompletion(long timeout, Predicate<Void> cond) throws Exception { + long start = TestUtils.monotonicNow(); + while (TestUtils.monotonicNow() - start < timeout && !cond.test(null)) { + Thread.sleep(100); + } + } +}
