STORM-2845 Drop standalone mode of Storm SQL

* remove all related interfaces/classes on standalone mode
* migrate tests to trident mode which are associated to standalone mode
* remove comments from tests which Calcite has fixed the issues


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1dcd12de
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1dcd12de
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1dcd12de

Branch: refs/heads/master
Commit: 1dcd12de27b813a8c3e42d600028fa97df0a5f15
Parents: 4495f66
Author: Jungtaek Lim <[email protected]>
Authored: Wed Dec 6 19:52:40 2017 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Thu Dec 7 09:59:28 2017 +0900

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/sql/StormSql.java  |   8 -
 .../org/apache/storm/sql/StormSqlContext.java   | 172 +++++
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 180 +----
 .../standalone/BuiltinAggregateFunctions.java   | 238 -------
 .../backends/standalone/PlanCompiler.java       | 139 ----
 .../standalone/PostOrderRelNodeVisitor.java     | 132 ----
 .../backends/standalone/RelNodeCompiler.java    | 484 -------------
 .../test/org/apache/storm/sql/SqlTestUtil.java  |  65 ++
 .../storm/sql/StormSqlLocalClusterImpl.java     |  80 +++
 .../test/org/apache/storm/sql/TestStormSql.java | 676 +++++++------------
 .../storm/sql/compiler/TestExprSemantic.java    | 410 -----------
 .../backends/standalone/TestCompilerUtils.java  | 183 -----
 .../backends/standalone/TestPlanCompiler.java   | 104 ---
 .../standalone/TestRelNodeCompiler.java         |  64 --
 .../backends/trident/TestExpressions.java       | 359 ++++++++++
 .../backends/trident/TestPlanCompiler.java      | 119 ++--
 .../storm/sql/hdfs/HdfsDataSourcesProvider.java |   7 -
 .../sql/kafka/KafkaDataSourcesProvider.java     |   7 -
 .../sql/mongodb/MongoDataSourcesProvider.java   |   7 -
 .../sql/redis/RedisDataSourcesProvider.java     |   6 -
 .../sql/runtime/AbstractChannelHandler.java     |  53 --
 .../sql/runtime/AbstractValuesProcessor.java    |  45 --
 .../storm/sql/runtime/ChannelContext.java       |  34 -
 .../storm/sql/runtime/ChannelHandler.java       |  42 --
 .../org/apache/storm/sql/runtime/Channels.java  | 110 ---
 .../apache/storm/sql/runtime/DataSource.java    |  28 -
 .../storm/sql/runtime/DataSourcesProvider.java  |  14 -
 .../storm/sql/runtime/DataSourcesRegistry.java  |  19 -
 .../storm/sql/runtime/StormSqlFunctions.java    |  48 --
 .../socket/SocketDataSourcesProvider.java       |   6 -
 .../test/org/apache/storm/sql/TestUtils.java    | 190 ++----
 31 files changed, 1048 insertions(+), 2981 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
index 3fbc463..49e4162 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -19,7 +19,6 @@ package org.apache.storm.sql;
 
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.runtime.ChannelHandler;
 
 import java.util.Map;
 
@@ -33,13 +32,6 @@ import java.util.Map;
  */
 public abstract class StormSql {
   /**
-   * Execute the SQL statements in stand-alone mode. The user can retrieve the 
result by passing in an instance
-   * of {@see ChannelHandler}.
-   */
-  public abstract void execute(Iterable<String> statements,
-                               ChannelHandler handler) throws Exception;
-
-  /**
    * Submit the SQL statements to Nimbus and run it as a topology.
    */
   public abstract void submit(

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
new file mode 100644
index 0000000..bf7ef3e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateFunction;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+
+public class StormSqlContext {
+    private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
+            RelDataTypeSystem.DEFAULT);
+    private final SchemaPlus schema = Frameworks.createRootSchema(true);
+    private boolean hasUdf = false;
+    private Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+
+    public void interpretCreateTable(SqlCreateTable n) {
+        CompilerUtil.TableBuilderInfo builder = new 
CompilerUtil.TableBuilderInfo(typeFactory);
+        List<FieldInfo> fields = new ArrayList<>();
+        for (ColumnDefinition col : n.fieldList()) {
+            builder.field(col.name(), col.type(), col.constraint());
+            RelDataType dataType = col.type().deriveType(typeFactory);
+            Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+            ColumnConstraint constraint = col.constraint();
+            boolean isPrimary = constraint != null && constraint instanceof 
ColumnConstraint.PrimaryKey;
+            fields.add(new FieldInfo(col.name(), javaType, isPrimary));
+        }
+
+        if (n.parallelism() != null) {
+            builder.parallelismHint(n.parallelism());
+        }
+        Table table = builder.build();
+        schema.add(n.tableName(), table);
+
+        ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(n.location(), n
+                .inputFormatClass(), n.outputFormatClass(), n.properties(), 
fields);
+        if (ds == null) {
+            throw new RuntimeException("Failed to find data source for " + n
+                    .tableName() + " URI: " + n.location());
+        } else if (dataSources.containsKey(n.tableName())) {
+            throw new RuntimeException("Duplicated definition for table " + n
+                    .tableName());
+        }
+        dataSources.put(n.tableName(), ds);
+    }
+
+    public void interpretCreateFunction(SqlCreateFunction sqlCreateFunction) 
throws ClassNotFoundException {
+        if(sqlCreateFunction.jarName() != null) {
+            throw new UnsupportedOperationException("UDF 'USING JAR' not 
implemented");
+        }
+        Method method;
+        Function function;
+        if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != 
null) {
+            function = ScalarFunctionImpl.create(method);
+        } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
+            function = 
AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
+        } else {
+            throw new RuntimeException("Invalid scalar or aggregate function");
+        }
+        schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
+        hasUdf = true;
+    }
+
+    public AbstractTridentProcessor compileSql(String query) throws Exception {
+        QueryPlanner planner = new QueryPlanner(schema);
+        return planner.compile(dataSources, query);
+    }
+
+    public String explain(String query) throws SqlParseException, 
ValidationException, RelConversionException {
+        FrameworkConfig config = buildFrameWorkConfig();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(query);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+
+        return StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
+    }
+
+    public FrameworkConfig buildFrameWorkConfig() {
+        if (hasUdf) {
+            List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+            sqlOperatorTables.add(SqlStdOperatorTable.instance());
+            sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
+                    false,
+                    Collections.<String>emptyList(), typeFactory));
+            return Frameworks.newConfigBuilder().defaultSchema(schema)
+                    .operatorTable(new 
ChainedSqlOperatorTable(sqlOperatorTables)).build();
+        } else {
+            return Frameworks.newConfigBuilder().defaultSchema(schema).build();
+        }
+    }
+
+    public JavaTypeFactory getTypeFactory() {
+        return typeFactory;
+    }
+
+    public SchemaPlus getSchema() {
+        return schema;
+    }
+
+    public boolean isHasUdf() {
+        return hasUdf;
+    }
+
+    public Map<String, ISqlTridentDataSource> getDataSources() {
+        return dataSources;
+    }
+
+    private Method findMethod(String clazzName, String methodName) throws 
ClassNotFoundException {
+        Class<?> clazz = Class.forName(clazzName);
+        for (Method method : clazz.getMethods()) {
+            if (method.getName().equals(methodName)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 1c83ac8..a771793 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -17,55 +17,21 @@
  */
 package org.apache.storm.sql;
 
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
-import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.parser.ColumnConstraint;
-import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.trident.TridentTopology;
 
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.jar.Attributes;
@@ -73,37 +39,11 @@ import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.zip.ZipEntry;
 
-import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
-
 class StormSqlImpl extends StormSql {
-  private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-  private final SchemaPlus schema = Frameworks.createRootSchema(true);
-  private boolean hasUdf = false;
+  private final StormSqlContext sqlContext;
 
-  @Override
-  public void execute(
-      Iterable<String> statements, ChannelHandler result)
-      throws Exception {
-    Map<String, DataSource> dataSources = new HashMap<>();
-    for (String sql : statements) {
-      StormParser parser = new StormParser(sql);
-      SqlNode node = parser.impl().parseSqlStmtEof();
-      if (node instanceof SqlCreateTable) {
-        handleCreateTable((SqlCreateTable) node, dataSources);
-      } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
-      } else {
-        FrameworkConfig config = buildFrameWorkConfig();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-        PlanCompiler compiler = new PlanCompiler(typeFactory);
-        AbstractValuesProcessor proc = compiler.compile(tree);
-        proc.initialize(dataSources, result);
-      }
-    }
+  public StormSqlImpl() {
+    sqlContext = new StormSqlContext();
   }
 
   @Override
@@ -111,17 +51,15 @@ class StormSqlImpl extends StormSql {
       String name, Iterable<String> statements, Map<String, Object> topoConf, 
SubmitOptions opts,
       StormSubmitter.ProgressListener progressListener, String asUser)
       throws Exception {
-    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
     for (String sql : statements) {
       StormParser parser = new StormParser(sql);
       SqlNode node = parser.impl().parseSqlStmtEof();
       if (node instanceof SqlCreateTable) {
-        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+        sqlContext.interpretCreateTable((SqlCreateTable) node);
       } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
-      }  else {
-        QueryPlanner planner = new QueryPlanner(schema);
-        AbstractTridentProcessor processor = planner.compile(dataSources, sql);
+        sqlContext.interpretCreateFunction((SqlCreateFunction) node);
+      } else {
+        AbstractTridentProcessor processor = sqlContext.compileSql(sql);
         TridentTopology topo = processor.build();
 
         Path jarPath = null;
@@ -145,7 +83,6 @@ class StormSqlImpl extends StormSql {
 
   @Override
   public void explain(Iterable<String> statements) throws Exception {
-    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
     for (String sql : statements) {
       StormParser parser = new StormParser(sql);
       SqlNode node = parser.impl().parseSqlStmtEof();
@@ -156,19 +93,13 @@ class StormSqlImpl extends StormSql {
       
System.out.println("-----------------------------------------------------------");
 
       if (node instanceof SqlCreateTable) {
-        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+        sqlContext.interpretCreateTable((SqlCreateTable) node);
         System.out.println("No plan presented on DDL");
       } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
+        sqlContext.interpretCreateFunction((SqlCreateFunction) node);
         System.out.println("No plan presented on DDL");
       } else {
-        FrameworkConfig config = buildFrameWorkConfig();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-
-        String plan = StormRelUtils.explain(tree, 
SqlExplainLevel.ALL_ATTRIBUTES);
+        String plan = sqlContext.explain(sql);
         System.out.println("plan>");
         System.out.println(plan);
       }
@@ -196,95 +127,4 @@ class StormSqlImpl extends StormSql {
       }
     }
   }
-
-  private void handleCreateTable(
-      SqlCreateTable n, Map<String, DataSource> dataSources) {
-    List<FieldInfo> fields = updateSchema(n);
-    DataSource ds = DataSourcesRegistry.construct(n.location(), n
-        .inputFormatClass(), n.outputFormatClass(), fields);
-    if (ds == null) {
-      throw new RuntimeException("Cannot construct data source for " + n
-          .tableName());
-    } else if (dataSources.containsKey(n.tableName())) {
-      throw new RuntimeException("Duplicated definition for table " + n
-          .tableName());
-    }
-    dataSources.put(n.tableName(), ds);
-  }
-
-  private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) 
throws ClassNotFoundException {
-    if(sqlCreateFunction.jarName() != null) {
-      throw new UnsupportedOperationException("UDF 'USING JAR' not 
implemented");
-    }
-    Method method;
-    Function function;
-    if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != 
null) {
-      function = ScalarFunctionImpl.create(method);
-    } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
-      function = 
AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
-    } else {
-      throw new RuntimeException("Invalid scalar or aggregate function");
-    }
-    schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
-    hasUdf = true;
-  }
-
-  private Method findMethod(String clazzName, String methodName) throws 
ClassNotFoundException {
-    Class<?> clazz = Class.forName(clazzName);
-    for (Method method : clazz.getMethods()) {
-      if (method.getName().equals(methodName)) {
-        return method;
-      }
-    }
-    return null;
-  }
-
-  private void handleCreateTableForTrident(
-      SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
-    List<FieldInfo> fields = updateSchema(n);
-    ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(n.location(), n
-        .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
-    if (ds == null) {
-      throw new RuntimeException("Failed to find data source for " + n
-          .tableName() + " URI: " + n.location());
-    } else if (dataSources.containsKey(n.tableName())) {
-      throw new RuntimeException("Duplicated definition for table " + n
-          .tableName());
-    }
-    dataSources.put(n.tableName(), ds);
-  }
-
-  private List<FieldInfo> updateSchema(SqlCreateTable n) {
-    TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
-    List<FieldInfo> fields = new ArrayList<>();
-    for (ColumnDefinition col : n.fieldList()) {
-      builder.field(col.name(), col.type(), col.constraint());
-      RelDataType dataType = col.type().deriveType(typeFactory);
-      Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
-      ColumnConstraint constraint = col.constraint();
-      boolean isPrimary = constraint != null && constraint instanceof 
ColumnConstraint.PrimaryKey;
-      fields.add(new FieldInfo(col.name(), javaType, isPrimary));
-    }
-
-    if (n.parallelism() != null) {
-      builder.parallelismHint(n.parallelism());
-    }
-    Table table = builder.build();
-    schema.add(n.tableName(), table);
-    return fields;
-  }
-
-  private FrameworkConfig buildFrameWorkConfig() {
-    if (hasUdf) {
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(SqlStdOperatorTable.instance());
-      sqlOperatorTables.add(new 
CalciteCatalogReader(CalciteSchema.from(schema),
-                                                     false,
-                                                     
Collections.<String>emptyList(), typeFactory));
-      return Frameworks.newConfigBuilder().defaultSchema(schema)
-              .operatorTable(new 
ChainedSqlOperatorTable(sqlOperatorTables)).build();
-    } else {
-      return Frameworks.newConfigBuilder().defaultSchema(schema).build();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
deleted file mode 100644
index 9dc4ba8..0000000
--- 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.storm.tuple.Values;
-
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Built-in implementations for some of the standard aggregation operations.
- * Aggregations can be implemented as a class with the following methods viz. 
init, add and result.
- * The class could contain only static methods, only non-static methods or be 
generic.
- */
-public class BuiltinAggregateFunctions {
-    // binds the type information and the class implementing the aggregation
-    public static class TypeClass {
-        public static class GenericType {
-        }
-
-        public final Type ty;
-        public final Class<?> clazz;
-
-        private TypeClass(Type ty, Class<?> clazz) {
-            this.ty = ty;
-            this.clazz = clazz;
-        }
-
-        static TypeClass of(Type ty, Class<?> clazz) {
-            return new TypeClass(ty, clazz);
-        }
-    }
-
-    static final Map<String, List<TypeClass>> TABLE = new HashMap<>();
-
-    public static class ByteSum {
-        public static Byte init() {
-            return 0;
-        }
-
-        public static Byte add(Byte accumulator, Byte val) {
-            return (byte) (accumulator + val);
-        }
-
-        public static Byte result(Byte accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class ShortSum {
-        public static Short init() {
-            return 0;
-        }
-
-        public static Short add(Short accumulator, Short val) {
-            return (short) (accumulator + val);
-        }
-
-        public static Short result(Short accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class IntSum {
-        public static Integer init() {
-            return 0;
-        }
-
-        public static Integer add(Integer accumulator, Integer val) {
-            return accumulator + val;
-        }
-
-        public static Integer result(Integer accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class LongSum {
-        public static Long init() {
-            return 0L;
-        }
-
-        public static Long add(Long accumulator, Long val) {
-            return accumulator + val;
-        }
-
-        public static Long result(Long accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class FloatSum {
-        public static Float init() {
-            return 0.0f;
-        }
-
-        public static Float add(Float accumulator, Float val) {
-            return accumulator + val;
-        }
-
-        public static Float result(Float accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class DoubleSum {
-        public static Double init() {
-            return 0.0;
-        }
-
-        public static Double add(Double accumulator, Double val) {
-            return accumulator + val;
-        }
-
-        public static Double result(Double accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class Max<T extends Comparable<T>> {
-        public T init() {
-            return null;
-        }
-
-        public T add(T accumulator, T val) {
-            return (accumulator == null || accumulator.compareTo(val) < 0) ? 
val : accumulator;
-        }
-
-        public T result(T accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class Min<T extends Comparable<T>> {
-        public T init() {
-            return null;
-        }
-
-        public T add(T accumulator, T val) {
-            return (accumulator == null || accumulator.compareTo(val) > 0) ? 
val : accumulator;
-        }
-
-        public T result(T accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class IntAvg {
-        private int count;
-
-        public Integer init() {
-            return 0;
-        }
-
-        public Integer add(Integer accumulator, Integer val) {
-            ++count;
-            return accumulator + val;
-        }
-
-        public Integer result(Integer accumulator) {
-            Integer result = accumulator / count;
-            count = 0;
-            return result;
-        }
-    }
-
-    public static class DoubleAvg {
-        private int count;
-
-        public Double init() {
-            return 0.0;
-        }
-
-        public Double add(Double accumulator, Double val) {
-            ++count;
-            return accumulator + val;
-        }
-
-        public Double result(Double accumulator) {
-            Double result = accumulator / count;
-            count = 0;
-            return result;
-        }
-    }
-
-    public static class Count {
-        public static Long init() {
-            return 0L;
-        }
-
-        public static Long add(Long accumulator, Values vals) {
-            for (Object val : vals) {
-                if (val == null) {
-                    return accumulator;
-                }
-            }
-            return accumulator + 1;
-        }
-
-        public static Long result(Long accumulator) {
-            return accumulator;
-        }
-    }
-
-    static {
-        TABLE.put("SUM", ImmutableList.of(
-                TypeClass.of(float.class, FloatSum.class),
-                TypeClass.of(double.class, DoubleSum.class),
-                TypeClass.of(byte.class, ByteSum.class),
-                TypeClass.of(short.class, ShortSum.class),
-                TypeClass.of(long.class, LongSum.class),
-                TypeClass.of(int.class, IntSum.class)));
-        TABLE.put("AVG", ImmutableList.of(
-                TypeClass.of(double.class, DoubleAvg.class),
-                TypeClass.of(int.class, IntAvg.class)));
-        TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, 
Count.class)));
-        TABLE.put("MAX", 
ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class)));
-        TABLE.put("MIN", 
ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
deleted file mode 100644
index 01546ed..0000000
--- 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.Set;
-
-public class PlanCompiler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PlanCompiler.class);
-
-  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
-  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
-  private static final String PROLOGUE = NEW_LINE_JOINER.join(
-      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
-      "import java.util.Iterator;", "import java.util.Map;", "import 
java.util.HashMap;",
-      "import java.util.List;", "import java.util.ArrayList;",
-      "import java.util.LinkedHashMap;",
-      "import org.apache.storm.tuple.Values;",
-      "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
-      "import org.apache.storm.sql.runtime.Channels;",
-      "import org.apache.storm.sql.runtime.ChannelContext;",
-      "import org.apache.storm.sql.runtime.ChannelHandler;",
-      "import org.apache.storm.sql.runtime.DataSource;",
-      "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
-      "import com.google.common.collect.ArrayListMultimap;",
-      "import com.google.common.collect.Multimap;",
-      "import org.apache.calcite.interpreter.Context;",
-      "import org.apache.calcite.interpreter.StormContext;",
-      "import org.apache.calcite.DataContext;",
-      "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
-      "public final class Processor extends AbstractValuesProcessor {",
-      "  public final static DataContext dataContext = new 
StormDataContext();",
-      "");
-  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
-      "  @Override",
-      "  public void initialize(Map<String, DataSource> data,",
-      "                         ChannelHandler result) {",
-      "    ChannelContext r = Channels.chain(Channels.voidContext(), result);",
-      ""
-  );
-
-  private final JavaTypeFactory typeFactory;
-
-  public PlanCompiler(JavaTypeFactory typeFactory) {
-    this.typeFactory = typeFactory;
-  }
-
-  private String generateJavaSource(RelNode root) throws Exception {
-    StringWriter sw = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(sw)) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      printPrologue(pw);
-      compiler.traverse(root);
-      printMain(pw, root);
-      printEpilogue(pw);
-    }
-    return sw.toString();
-  }
-
-  private void printMain(PrintWriter pw, RelNode root) {
-    Set<TableScan> tables = new HashSet<>();
-    pw.print(INITIALIZER_PROLOGUE);
-    chainOperators(pw, root, tables);
-    for (TableScan n : tables) {
-      String escaped = CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(n.getTable().getQualifiedName()), true);
-      String r = NEW_LINE_JOINER.join(
-          "    if (!data.containsKey(%1$s))",
-          "      throw new RuntimeException(\"Cannot find table \" + %1$s);",
-          "  data.get(%1$s).open(CTX_%2$d);",
-          "");
-      pw.print(String.format(r, escaped, n.getId()));
-    }
-    pw.print("  }\n");
-  }
-
-  private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> 
tables) {
-    doChainOperators(pw, root, tables, "r");
-  }
-
-  private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> 
tables, String parentCtx) {
-    pw.print(
-            String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, 
%3$s);\n",
-                          node.getId(), parentCtx, 
RelNodeCompiler.getStageName(node)));
-    String currentCtx = String.format("CTX_%d", node.getId());
-    if (node instanceof TableScan) {
-      tables.add((TableScan) node);
-    }
-    for (RelNode i : node.getInputs()) {
-      doChainOperators(pw, i, tables, currentCtx);
-    }
-  }
-
-  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
-    String javaCode = generateJavaSource(plan);
-    LOG.debug("Compiling... source code {}", javaCode);
-    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
-                                              PACKAGE_NAME + ".Processor",
-                                              javaCode, null);
-    return (AbstractValuesProcessor) cl.loadClass(
-        PACKAGE_NAME + ".Processor").newInstance();
-  }
-
-  private static void printEpilogue(
-      PrintWriter pw) throws Exception {
-    pw.print("}\n");
-  }
-
-  private static void printPrologue(PrintWriter pw) {
-    pw.append(PROLOGUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
deleted file mode 100644
index afed8a9..0000000
--- 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.*;
-import org.apache.calcite.rel.stream.Delta;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class PostOrderRelNodeVisitor<T> {
-  public final T traverse(RelNode n) throws Exception {
-    List<T> inputStreams = new ArrayList<>();
-    for (RelNode input : n.getInputs()) {
-      inputStreams.add(traverse(input));
-    }
-
-    if (n instanceof Aggregate) {
-      return visitAggregate((Aggregate) n, inputStreams);
-    } else if (n instanceof Calc) {
-      return visitCalc((Calc) n, inputStreams);
-    } else if (n instanceof Collect) {
-      return visitCollect((Collect) n, inputStreams);
-    } else if (n instanceof Correlate) {
-      return visitCorrelate((Correlate) n, inputStreams);
-    } else if (n instanceof Delta) {
-      return visitDelta((Delta) n, inputStreams);
-    } else if (n instanceof Exchange) {
-      return visitExchange((Exchange) n, inputStreams);
-    } else if (n instanceof Project) {
-      return visitProject((Project) n, inputStreams);
-    } else if (n instanceof Filter) {
-      return visitFilter((Filter) n, inputStreams);
-    } else if (n instanceof Sample) {
-      return visitSample((Sample) n, inputStreams);
-    } else if (n instanceof Sort) {
-      return visitSort((Sort) n, inputStreams);
-    } else if (n instanceof TableModify) {
-      return visitTableModify((TableModify) n, inputStreams);
-    } else if (n instanceof TableScan) {
-      return visitTableScan((TableScan) n, inputStreams);
-    } else if (n instanceof Uncollect) {
-      return visitUncollect((Uncollect) n, inputStreams);
-    } else if (n instanceof Window) {
-      return visitWindow((Window) n, inputStreams);
-    } else if (n instanceof Join) {
-      return visitJoin((Join) n, inputStreams);
-    } else {
-      return defaultValue(n, inputStreams);
-    }
-  }
-
-  public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws 
Exception {
-    return defaultValue(aggregate, inputStreams);
-  }
-
-  public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
-    return defaultValue(calc, inputStreams);
-  }
-
-  public T visitCollect(Collect collect, List<T> inputStreams) throws 
Exception {
-    return defaultValue(collect, inputStreams);
-  }
-
-  public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws 
Exception {
-    return defaultValue(correlate, inputStreams);
-  }
-
-  public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
-    return defaultValue(delta, inputStreams);
-  }
-
-  public T visitExchange(Exchange exchange, List<T> inputStreams) throws 
Exception {
-    return defaultValue(exchange, inputStreams);
-  }
-
-  public T visitProject(Project project, List<T> inputStreams) throws 
Exception {
-    return defaultValue(project, inputStreams);
-  }
-
-  public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
-    return defaultValue(filter, inputStreams);
-  }
-
-  public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
-    return defaultValue(sample, inputStreams);
-  }
-
-  public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
-    return defaultValue(sort, inputStreams);
-  }
-
-  public T visitTableModify(TableModify modify, List<T> inputStreams) throws 
Exception {
-    return defaultValue(modify, inputStreams);
-  }
-
-  public T visitTableScan(TableScan scan, List<T> inputStreams) throws 
Exception {
-    return defaultValue(scan, inputStreams);
-  }
-
-  public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws 
Exception {
-    return defaultValue(uncollect, inputStreams);
-  }
-
-  public T visitWindow(Window window, List<T> inputStreams) throws Exception {
-    return defaultValue(window, inputStreams);
-  }
-
-  public T visitJoin(Join join, List<T> inputStreams) throws Exception {
-    return defaultValue(join, inputStreams);
-  }
-
-  public T defaultValue(RelNode n, List<T> inputStreams) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
 
b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
deleted file mode 100644
index 97995c7..0000000
--- 
a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rel.stream.Delta;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.AggregateFunction;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Compile RelNodes into individual functions.
- */
-class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
-  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
-
-  private final PrintWriter pw;
-  private final JavaTypeFactory typeFactory;
-  private final RexNodeToJavaCodeCompiler rexCompiler;
-
-  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-    "  private static final ChannelHandler %1$s = ",
-    "    new AbstractChannelHandler() {",
-    "    @Override",
-    "    public void dataReceived(ChannelContext ctx, Values _data) {",
-    ""
-  );
-
-  private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = ",
-          "    new AbstractChannelHandler() {",
-          "    private final Values EMPTY_VALUES = new Values();",
-          "    private final Map<List<Object>, Map<String, Object>> state = 
new LinkedHashMap<>();",
-          "    private final int[] groupIndices = new int[] {%2$s};",
-          "    private List<Object> getGroupValues(Values _data) {",
-          "      List<Object> res = new ArrayList<>();",
-          "      for (int i: groupIndices) {",
-          "        res.add(_data.get(i));",
-          "      }",
-          "      return res;",
-          "    }",
-          "",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "      emitAggregateResults(ctx);",
-          "      super.flush(ctx);",
-          "      state.clear();",
-          "    }",
-          "",
-          "    private void emitAggregateResults(ChannelContext ctx) {",
-          "        for (Map.Entry<List<Object>, Map<String, Object>> entry: 
state.entrySet()) {",
-          "          List<Object> groupValues = entry.getKey();",
-          "          Map<String, Object> accumulators = entry.getValue();",
-          "          %3$s",
-          "        }",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          ""
-  );
-
-  private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = ",
-          "    new AbstractChannelHandler() {",
-          "      Object left = %2$s;",
-          "      Object right = %3$s;",
-          "      Object source = null;",
-          "      List<Values> leftRows = new ArrayList<>();",
-          "      List<Values> rightRows = new ArrayList<>();",
-          "      boolean leftDone = false;",
-          "      boolean rightDone = false;",
-          "      int[] ordinals = new int[] {%4$s, %5$s};",
-          "",
-          "      Multimap<Object, Values> getJoinTable(List<Values> rows, int 
joinIndex) {",
-          "         Multimap<Object, Values> m = ArrayListMultimap.create();",
-          "         for(Values v: rows) {",
-          "           m.put(v.get(joinIndex), v);",
-          "         }",
-          "         return m;",
-          "      }",
-          "",
-          "      List<Values> join(Multimap<Object, Values> tab, List<Values> 
rows, int rowIdx, boolean rev) {",
-          "         List<Values> res = new ArrayList<>();",
-          "         for (Values row: rows) {",
-          "           for (Values mapValue: tab.get(row.get(rowIdx))) {",
-          "             if (mapValue != null) {",
-          "               Values joinedRow = new Values();",
-          "               if(rev) {",
-          "                 joinedRow.addAll(row);",
-          "                 joinedRow.addAll(mapValue);",
-          "               } else {",
-          "                 joinedRow.addAll(mapValue);",
-          "                 joinedRow.addAll(row);",
-          "               }",
-          "               res.add(joinedRow);",
-          "             }",
-          "           }",
-          "         }",
-          "         return res;",
-          "      }",
-          "",
-          "    @Override",
-          "    public void setSource(ChannelContext ctx, Object source) {",
-          "      this.source = source;",
-          "    }",
-          "",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "        if (source == left) {",
-          "            leftDone = true;",
-          "        } else if (source == right) {",
-          "            rightDone = true;",
-          "        }",
-          "        if (leftDone && rightDone) {",
-          "          if (leftRows.size() <= rightRows.size()) {",
-          "            for(Values res: join(getJoinTable(leftRows, 
ordinals[0]), rightRows, ordinals[1], false)) {",
-          "              ctx.emit(res);",
-          "            }",
-          "          } else {",
-          "            for(Values res: join(getJoinTable(rightRows, 
ordinals[1]), leftRows, ordinals[0], true)) {",
-          "              ctx.emit(res);",
-          "            }",
-          "          }",
-          "          leftDone = rightDone = false;",
-          "          leftRows.clear();",
-          "          rightRows.clear();",
-          "          super.flush(ctx);",
-          "        }",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          ""
-  );
-
-  private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
-      "  private static final ChannelHandler %1$s = 
AbstractChannelHandler.PASS_THROUGH;",
-      "");
-
-  private static final String STAGE_ENUMERABLE_TABLE_SCAN = 
NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = new 
AbstractChannelHandler() {",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "      ctx.setSource(this);",
-          "      super.flush(ctx);",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          "      ctx.setSource(this);",
-          "      ctx.emit(_data);",
-          "    }",
-          "  };",
-          "");
-
-  private int nameCount;
-  private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>();
-
-  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
-    this.pw = pw;
-    this.typeFactory = typeFactory;
-    this.rexCompiler = new RexNodeToJavaCodeCompiler(new 
RexBuilder(typeFactory));
-  }
-
-  @Override
-  public Void visitDelta(Delta delta, List<Void> inputStreams) throws 
Exception {
-    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
-    return null;
-  }
-
-  @Override
-  public Void visitFilter(Filter filter, List<Void> inputStreams) throws 
Exception {
-    beginStage(filter);
-
-    List<RexNode> childExps = filter.getChildExps();
-    RelDataType inputRowType = filter.getInput(0).getRowType();
-
-    pw.print("Context context = new StormContext(Processor.dataContext);\n");
-    pw.print("context.values = _data.toArray();\n");
-    pw.print("Object[] outputValues = new Object[1];\n");
-
-    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
-    String r = "((Boolean) outputValues[0])";
-    if (filter.getCondition().getType().isNullable()) {
-      pw.print(String.format("    if (%s != null && %s) { ctx.emit(_data); 
}\n", r, r));
-    } else {
-      pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r, r));
-    }
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitProject(Project project, List<Void> inputStreams) throws 
Exception {
-    beginStage(project);
-
-    List<RexNode> childExps = project.getChildExps();
-    RelDataType inputRowType = project.getInput(0).getRowType();
-    int outputCount = project.getRowType().getFieldCount();
-
-    pw.print("Context context = new StormContext(Processor.dataContext);\n");
-    pw.print("context.values = _data.toArray();\n");
-    pw.print(String.format("Object[] outputValues = new Object[%d];\n", 
outputCount));
-
-    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
-    pw.print("    ctx.emit(new Values(outputValues));\n");
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void defaultValue(RelNode n, List<Void> inputStreams) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws 
Exception {
-    pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
-    return null;
-  }
-
-  @Override
-  public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) 
throws Exception {
-    beginAggregateStage(aggregate);
-    pw.println("        if (_data != null) {");
-    pw.println("        List<Object> curGroupValues = getGroupValues(_data);");
-    pw.println("        if (!state.containsKey(curGroupValues)) {");
-    pw.println("          state.put(curGroupValues, new HashMap<String, 
Object>());");
-    pw.println("        }");
-    pw.println("        Map<String, Object> accumulators = 
state.get(curGroupValues);");
-    for (AggregateCall call : aggregate.getAggCallList()) {
-      aggregate(call);
-    }
-    pw.println("        }");
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitJoin(Join join, List<Void> inputStreams) {
-    beginJoinStage(join);
-    pw.println("        if (source == left) {");
-    pw.println("            leftRows.add(_data);");
-    pw.println("        } else if (source == right) {");
-    pw.println("            rightRows.add(_data);");
-    pw.println("        }");
-    endStage();
-    return null;
-  }
-
-  private String groupValueEmitStr(String var, int n) {
-    int count = 0;
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      if (++count > 1) {
-        sb.append(", ");
-      }
-      sb.append(var).append(".").append("get(").append(i).append(")");
-    }
-    return sb.toString();
-  }
-
-  private String emitAggregateStmts(Aggregate aggregate) {
-    List<String> res = new ArrayList<>();
-    StringWriter sw = new StringWriter();
-    for (AggregateCall call : aggregate.getAggCallList()) {
-      res.add(aggregateResult(call, new PrintWriter(sw)));
-    }
-    return NEW_LINE_JOINER.join(sw.toString(),
-                                String.format("          ctx.emit(new 
Values(%s, %s));",
-                                              groupValueEmitStr("groupValues", 
aggregate.getGroupSet().cardinality()),
-                                              Joiner.on(", ").join(res)));
-  }
-
-  private String aggregateResult(AggregateCall call, PrintWriter pw) {
-    SqlAggFunction aggFunction = call.getAggregation();
-    String aggregationName = call.getAggregation().getName();
-    Type ty = typeFactory.getJavaClass(call.getType());
-    String result;
-    if (aggFunction instanceof SqlUserDefinedAggFunction) {
-      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) 
aggFunction).function;
-      result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, 
reserveAggVarName(call), ty, pw);
-    } else {
-      List<BuiltinAggregateFunctions.TypeClass> typeClasses = 
BuiltinAggregateFunctions.TABLE.get(aggregationName);
-      if (typeClasses == null) {
-        throw new UnsupportedOperationException(aggregationName + " Not 
implemented");
-      }
-      result = 
doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName,
 typeClasses, ty)),
-                                 reserveAggVarName(call), ty, pw);
-    }
-    return result;
-  }
-
-  private String doAggregateResult(AggregateFunctionImpl aggFn, String 
varName, Type ty, PrintWriter pw) {
-    String resultName = varName + "_result";
-    Class<?> accumulatorType = aggFn.accumulatorType;
-    Class<?> resultType = aggFn.resultType;
-    List<String> args = new ArrayList<>();
-    if (!aggFn.isStatic) {
-      String aggObjName = String.format("%s_obj", varName);
-      String aggObjClassName = 
aggFn.initMethod.getDeclaringClass().getCanonicalName();
-      pw.println("          @SuppressWarnings(\"unchecked\")");
-      pw.println(String.format("          final %1$s %2$s = (%1$s) 
accumulators.get(\"%2$s\");", aggObjClassName,
-              aggObjName));
-      args.add(aggObjName);
-    }
-    args.add(String.format("(%s)accumulators.get(\"%s\")", 
accumulatorType.getCanonicalName(), varName));
-    pw.println(String.format("          final %s %s = %s;", 
resultType.getCanonicalName(),
-                             resultName, printMethodCall(aggFn.resultMethod, 
args)));
-
-    return resultName;
-  }
-
-  private void aggregate(AggregateCall call) {
-    SqlAggFunction aggFunction = call.getAggregation();
-    String aggregationName = call.getAggregation().getName();
-    Type ty = typeFactory.getJavaClass(call.getType());
-    if (call.getArgList().size() != 1) {
-      if (aggregationName.equals("COUNT")) {
-        if (call.getArgList().size() != 0) {
-          throw new UnsupportedOperationException("Count with nullable 
fields");
-        }
-      }
-    }
-    if (aggFunction instanceof SqlUserDefinedAggFunction) {
-      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) 
aggFunction).function;
-      doAggregate((AggregateFunctionImpl) aggregateFunction, 
reserveAggVarName(call), ty, call.getArgList());
-    } else {
-      List<BuiltinAggregateFunctions.TypeClass> typeClasses = 
BuiltinAggregateFunctions.TABLE.get(aggregationName);
-      if (typeClasses == null) {
-        throw new UnsupportedOperationException(aggregationName + " Not 
implemented");
-      }
-      
doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, 
typeClasses, ty)),
-                  reserveAggVarName(call), ty, call.getArgList());
-    }
-  }
-
-  private Class<?> findMatchingClass(String aggregationName, 
List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) {
-    for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) {
-      if 
(typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || 
typeClass.ty.equals(ty)) {
-        return typeClass.clazz;
-      }
-    }
-    throw new UnsupportedOperationException(aggregationName + " Not implemeted 
for type '" + ty + "'");
-  }
-
-  private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type 
ty, List<Integer> argList) {
-    List<String> args = new ArrayList<>();
-    Class<?> accumulatorType = aggFn.accumulatorType;
-    if (!aggFn.isStatic) {
-      String aggObjName = String.format("%s_obj", varName);
-      String aggObjClassName = 
aggFn.initMethod.getDeclaringClass().getCanonicalName();
-      pw.println(String.format("          if 
(!accumulators.containsKey(\"%s\")) { ", aggObjName));
-      pw.println(String.format("            accumulators.put(\"%s\", new 
%s());", aggObjName, aggObjClassName));
-      pw.println("          }");
-      pw.println("          @SuppressWarnings(\"unchecked\")");
-      pw.println(String.format("          final %1$s %2$s = (%1$s) 
accumulators.get(\"%2$s\");", aggObjClassName,
-              aggObjName));
-      args.add(aggObjName);
-    }
-    args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
-                           "accumulators.get(\"" + varName + "\")",
-                           printMethodCall(aggFn.initMethod, args),
-                           accumulatorType.getCanonicalName()));
-    if (argList.isEmpty()) {
-      args.add("EMPTY_VALUES");
-    } else {
-      for (int i = 0; i < aggFn.valueTypes.size(); i++) {
-        args.add(String.format("(%s) %s", 
aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + 
")"));
-      }
-    }
-    pw.print(String.format("          accumulators.put(\"%s\", %s);\n",
-                           varName,
-                           printMethodCall(aggFn.addMethod, args)));
-  }
-
-  private String reserveAggVarName(AggregateCall call) {
-    String varName;
-    if ((varName = aggregateCallVarNames.get(call)) == null) {
-      varName = call.getAggregation().getName() + ++nameCount;
-      aggregateCallVarNames.put(call, varName);
-    }
-    return varName;
-  }
-
-  private void beginStage(RelNode n) {
-    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
-  }
-
-  private void beginAggregateStage(Aggregate n) {
-    pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), 
getGroupByIndices(n), emitAggregateStmts(n)));
-  }
-
-  private void beginJoinStage(Join join) {
-    int[] ordinals = new int[2];
-    if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
-      throw new UnsupportedOperationException("Only simple equi joins are 
supported");
-    }
-
-    pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join),
-                           getStageName(join.getLeft()),
-                           getStageName(join.getRight()),
-                           ordinals[0],
-                           ordinals[1]));
-  }
-
-  private void endStage() {
-    pw.print("  }\n  };\n");
-  }
-
-  static String getStageName(RelNode n) {
-    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
-  }
-
-  private String getGroupByIndices(Aggregate n) {
-    StringBuilder res = new StringBuilder();
-    int count = 0;
-    for (int i : n.getGroupSet()) {
-      if (++count > 1) {
-        res.append(", ");
-      }
-      res.append(i);
-    }
-    return res.toString();
-  }
-
-  public static String printMethodCall(Method method, List<String> args) {
-    return printMethodCall(method.getDeclaringClass(), method.getName(),
-            Modifier.isStatic(method.getModifiers()), args);
-  }
-
-  private static String printMethodCall(Class<?> clazz, String method, boolean 
isStatic, List<String> args) {
-    if (isStatic) {
-      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, 
Joiner.on(',').join(args));
-    } else {
-      return String.format("%s.%s(%s)", args.get(0), method,
-              Joiner.on(',').join(args.subList(1, args.size())));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
new file mode 100644
index 0000000..5cd4054
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql;
+
+import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+
+import java.util.concurrent.Callable;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.utils.Utils;
+
+public final class SqlTestUtil {
+
+    public static void runTridentTopology(LocalCluster cluster, final int 
expectedValueSize, AbstractTridentProcessor proc,
+                                    TridentTopology topo) throws Exception {
+        final Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+
+        if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 
0) {
+            CompilingClassLoader lastClassloader = 
proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
+            Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
+        }
+
+        try (LocalCluster.LocalTopology stormTopo = 
cluster.submitTopology("storm-sql", conf, topo.build())) {
+            waitForCompletion(1000 * 1000, new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return getCollectedValues().size() < expectedValueSize;
+                }
+            });
+        } finally {
+            while(cluster.getClusterInfo().get_topologies_size() > 0) {
+                Thread.sleep(10);
+            }
+            Utils.resetClassLoaderForJavaDeSerialize();
+        }
+    }
+
+    private static void waitForCompletion(long timeout, Callable<Boolean> 
cond) throws Exception {
+        long start = TestUtils.monotonicNow();
+        while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
+            Thread.sleep(100);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
new file mode 100644
index 0000000..b3acee5
--- /dev/null
+++ 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql;
+
+import java.util.function.Predicate;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.parser.SqlCreateFunction;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.utils.Utils;
+
+public class StormSqlLocalClusterImpl {
+    private final StormSqlContext sqlContext;
+
+    public StormSqlLocalClusterImpl() {
+        sqlContext = new StormSqlContext();
+    }
+
+    public void runLocal(LocalCluster localCluster, Iterable<String> 
statements,
+                         Predicate<Void> waitCondition, long waitTimeoutMs) 
throws Exception {
+        final Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+
+        for (String sql : statements) {
+            StormParser parser = new StormParser(sql);
+            SqlNode node = parser.impl().parseSqlStmtEof();
+            if (node instanceof SqlCreateTable) {
+                sqlContext.interpretCreateTable((SqlCreateTable) node);
+            } else if (node instanceof SqlCreateFunction) {
+                sqlContext.interpretCreateFunction((SqlCreateFunction) node);
+            } else {
+                AbstractTridentProcessor processor = 
sqlContext.compileSql(sql);
+                TridentTopology topo = processor.build();
+
+                if (processor.getClassLoaders() != null && 
processor.getClassLoaders().size() > 0) {
+                    CompilingClassLoader lastClassloader = 
processor.getClassLoaders().get(processor.getClassLoaders().size() - 1);
+                    Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
+                }
+
+                try (LocalCluster.LocalTopology stormTopo = 
localCluster.submitTopology("storm-sql", conf, topo.build())) {
+                    waitForCompletion(waitTimeoutMs, waitCondition);
+                } finally {
+                    while(localCluster.getClusterInfo().get_topologies_size() 
> 0) {
+                        Thread.sleep(10);
+                    }
+                    Utils.resetClassLoaderForJavaDeSerialize();
+                }
+            }
+        }
+    }
+
+    private static void waitForCompletion(long timeout, Predicate<Void> cond) 
throws Exception {
+        long start = TestUtils.monotonicNow();
+        while (TestUtils.monotonicNow() - start < timeout && !cond.test(null)) 
{
+            Thread.sleep(100);
+        }
+    }
+}

Reply via email to