Repository: flink Updated Branches: refs/heads/master 022ceb58b -> 11fa089d6
[FLINK-4639] [table] Introduce CalciteConfig to make Calcite features more pluggable. This closes #2521 This closes #1617 // closing PR after discussion Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc0b3430 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc0b3430 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc0b3430 Branch: refs/heads/master Commit: dc0b3430f8d243b1876125e4949e733ae757aa96 Parents: 022ceb5 Author: twalthr <[email protected]> Authored: Tue Sep 20 11:35:25 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Tue Oct 25 12:59:18 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/BatchTableEnvironment.scala | 14 +- .../apache/flink/api/table/CalciteConfig.scala | 161 ++++++++++++++ .../api/table/StreamTableEnvironment.scala | 10 +- .../apache/flink/api/table/TableConfig.scala | 19 +- .../flink/api/table/TableEnvironment.scala | 91 ++++++-- .../api/java/batch/TableEnvironmentITCase.java | 16 ++ .../api/table/CalciteConfigBuilderTest.scala | 208 +++++++++++++++++++ .../flink/api/table/TableEnvironmentTest.scala | 5 +- 8 files changed, 496 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 10c2450..1d34777 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -24,10 +24,9 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.Programs - +import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.table.explain.PlanJsonParser @@ -35,7 +34,7 @@ import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets -import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable} +import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable} import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.api.table.sources.BatchTableSource @@ -229,6 +228,11 @@ abstract class BatchTableEnvironment( } /** + * Returns the built-in rules that are defined by the environment. + */ + protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES + + /** * Generates the optimized [[RelNode]] tree from the original relational node tree. * * @param relNode The original [[RelNode]] tree @@ -240,7 +244,7 @@ abstract class BatchTableEnvironment( val decorPlan = RelDecorrelator.decorrelateQuery(relNode) // optimize the logical Flink plan - val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) + val optProgram = Programs.ofRules(getRuleSet) val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() val dataSetPlan = try { http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala new file mode 100644 index 0000000..06b3edc --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.calcite.plan.RelOptRule +import org.apache.calcite.sql.SqlOperatorTable +import org.apache.calcite.sql.parser.SqlParser +import org.apache.calcite.sql.util.ChainedSqlOperatorTable +import org.apache.calcite.tools.{RuleSets, RuleSet} +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ + +/** + * Builder for creating a Calcite configuration. + */ +class CalciteConfigBuilder { + private var replaceRules: Boolean = false + private var ruleSets: List[RuleSet] = Nil + + private var replaceOperatorTable: Boolean = false + private var operatorTables: List[SqlOperatorTable] = Nil + + private var replaceSqlParserConfig: Option[SqlParser.Config] = None + + /** + * Replaces the built-in rule set with the given rule set. + */ + def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(replaceRuleSet) + ruleSets = List(replaceRuleSet) + replaceRules = true + this + } + + /** + * Appends the given rule set to the built-in rule set. + */ + def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(addedRuleSet) + ruleSets = addedRuleSet :: ruleSets + this + } + + /** + * Replaces the built-in SQL operator table with the given table. + */ + def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = { + Preconditions.checkNotNull(replaceSqlOperatorTable) + operatorTables = List(replaceSqlOperatorTable) + replaceOperatorTable = true + this + } + + /** + * Appends the given table to the built-in SQL operator table. + */ + def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = { + Preconditions.checkNotNull(addedSqlOperatorTable) + this.operatorTables = addedSqlOperatorTable :: this.operatorTables + this + } + + /** + * Replaces the built-in SQL parser configuration with the given configuration. + */ + def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = { + Preconditions.checkNotNull(sqlParserConfig) + replaceSqlParserConfig = Some(sqlParserConfig) + this + } + + private class CalciteConfigImpl( + val getRuleSet: Option[RuleSet], + val replacesRuleSet: Boolean, + val getSqlOperatorTable: Option[SqlOperatorTable], + val replacesSqlOperatorTable: Boolean, + val getSqlParserConfig: Option[SqlParser.Config]) + extends CalciteConfig + + /** + * Builds a new [[CalciteConfig]]. + */ + def build(): CalciteConfig = new CalciteConfigImpl( + ruleSets match { + case Nil => None + case h :: Nil => Some(h) + case _ => + // concat rule sets + val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c) + Some(RuleSets.ofList(concatRules.asJava)) + }, + this.replaceRules, + operatorTables match { + case Nil => None + case h :: Nil => Some(h) + case _ => + // chain operator tables + Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y))) + }, + this.replaceOperatorTable, + replaceSqlParserConfig) +} + +/** + * Calcite configuration for defining a custom Calcite configuration for Table and SQL API. + */ +trait CalciteConfig { + /** + * Returns whether this configuration replaces the built-in rule set. + */ + def replacesRuleSet: Boolean + + /** + * Returns a custom rule set. + */ + def getRuleSet: Option[RuleSet] + + /** + * Returns whether this configuration replaces the built-in SQL operator table. + */ + def replacesSqlOperatorTable: Boolean + + /** + * Returns a custom SQL operator table. + */ + def getSqlOperatorTable: Option[SqlOperatorTable] + + /** + * Returns a custom SQL parser configuration. + */ + def getSqlParserConfig: Option[SqlParser.Config] +} + +object CalciteConfig { + + val DEFAULT = createBuilder().build() + + /** + * Creates a new builder for constructing a [[CalciteConfig]]. + */ + def createBuilder(): CalciteConfigBuilder = { + new CalciteConfigBuilder + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 15e3960..ac21834 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -24,8 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.Programs - +import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} @@ -204,6 +203,11 @@ abstract class StreamTableEnvironment( } /** + * Returns the built-in rules that are defined by the environment. + */ + protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES + + /** * Generates the optimized [[RelNode]] tree from the original relational node tree. * * @param relNode The root node of the relational expression tree. @@ -214,7 +218,7 @@ abstract class StreamTableEnvironment( val decorPlan = RelDecorrelator.decorrelateQuery(relNode) // optimize the logical Flink plan - val optProgram = Programs.ofRules(FlinkRuleSets.DATASTREAM_OPT_RULES) + val optProgram = Programs.ofRules(getRuleSet) val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify() val dataStreamPlan = try { http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala index c92451d..37d9cb5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala @@ -22,7 +22,7 @@ import java.util.TimeZone /** * A config to define the runtime behavior of the Table API. */ -class TableConfig extends Serializable { +class TableConfig { /** * Defines the timezone for date/time/timestamp conversions. @@ -41,6 +41,11 @@ class TableConfig extends Serializable { private var efficientTypeUsage = false /** + * Defines the configuration of Calcite for Table API and SQL queries. + */ + private var calciteConfig = CalciteConfig.DEFAULT + + /** * Sets the timezone for date/time/timestamp conversions. */ def setTimeZone(timeZone: TimeZone): Unit = { @@ -83,6 +88,18 @@ class TableConfig extends Serializable { this.efficientTypeUsage = efficientTypeUsage } + /** + * Returns the current configuration of Calcite for Table API and SQL queries. + */ + def getCalciteConfig: CalciteConfig = calciteConfig + + /** + * Sets the configuration of Calcite for Table API and SQL queries. + * Changing the configuration has no effect after the first query has been defined. + */ + def setCalciteConfig(calciteConfig: CalciteConfig): Unit = { + this.calciteConfig = calciteConfig + } } object TableConfig { http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index c3b728b..df97d2d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -29,7 +29,8 @@ import org.apache.calcite.schema.{Schemas, SchemaPlus} import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.tools.{FrameworkConfig, Frameworks} +import org.apache.calcite.sql.util.ChainedSqlOperatorTable +import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} @@ -46,6 +47,8 @@ import org.apache.flink.api.table.validate.FunctionCatalog import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} +import scala.collection.JavaConverters._ + /** * The abstract base class for batch and stream TableEnvironments. * @@ -53,42 +56,31 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala */ abstract class TableEnvironment(val config: TableConfig) { - // configure sql parser - // we use Java lex because back ticks are easier than double quotes in programming - // and cases are preserved - private val parserConfig = SqlParser - .configBuilder() - .setLex(Lex.JAVA) - .build() - // the catalog to hold all registered and translated tables private val tables: SchemaPlus = Frameworks.createRootSchema(true) // Table API/SQL function catalog private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns - // SQL operator and function catalog - private val sqlOperatorTable: SqlOperatorTable = functionCatalog.getSqlOperatorTable - // the configuration to create a Calcite planner - private val frameworkConfig: FrameworkConfig = Frameworks + private lazy val frameworkConfig: FrameworkConfig = Frameworks .newConfigBuilder .defaultSchema(tables) - .parserConfig(parserConfig) + .parserConfig(getSqlParserConfig) .costFactory(new DataSetCostFactory) .typeSystem(new FlinkTypeSystem) - .operatorTable(sqlOperatorTable) + .operatorTable(getSqlOperatorTable) // set the executor to evaluate constant expressions .executor(new RexExecutorImpl(Schemas.createDataContext(null))) .build // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. - protected val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig) + protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig) // the planner instance used to optimize queries of this TableEnvironment - private val planner: RelOptPlanner = relBuilder.getPlanner + private lazy val planner: RelOptPlanner = relBuilder.getPlanner - private val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory + private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory // a counter for unique attribute names private val attrNameCntr: AtomicInteger = new AtomicInteger(0) @@ -97,6 +89,69 @@ abstract class TableEnvironment(val config: TableConfig) { def getConfig = config /** + * Returns the operator table for this environment including a custom Calcite configuration. + */ + protected def getSqlOperatorTable: SqlOperatorTable = { + val calciteConfig = config.getCalciteConfig + calciteConfig.getSqlOperatorTable match { + + case None => + functionCatalog.getSqlOperatorTable + + case Some(table) => + if (calciteConfig.replacesSqlOperatorTable) { + table + } else { + ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table) + } + } + } + + /** + * Returns the rule set for this environment including a custom Calcite configuration. + */ + protected def getRuleSet: RuleSet = { + val calciteConfig = config.getCalciteConfig + calciteConfig.getRuleSet match { + + case None => + getBuiltInRuleSet + + case Some(ruleSet) => + if (calciteConfig.replacesRuleSet) { + ruleSet + } else { + RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava) + } + } + } + + /** + * Returns the SQL parser config for this environment including a custom Calcite configuration. + */ + protected def getSqlParserConfig: SqlParser.Config = { + val calciteConfig = config.getCalciteConfig + calciteConfig.getSqlParserConfig match { + + case None => + // we use Java lex because back ticks are easier than double quotes in programming + // and cases are preserved + SqlParser + .configBuilder() + .setLex(Lex.JAVA) + .build() + + case Some(sqlParserConfig) => + sqlParserConfig + } + } + + /** + * Returns the built-in rules that are defined by the environment. + */ + protected def getBuiltInRuleSet: RuleSet + + /** * Registers a [[UserDefinedFunction]] under a unique name. Replaces already existing * user-defined functions under this name. */ http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java index 5e40724..2d662d6 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.calcite.tools.RuleSets; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; @@ -33,6 +34,8 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; +import org.apache.flink.api.table.CalciteConfig; +import org.apache.flink.api.table.CalciteConfigBuilder; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; @@ -436,6 +439,19 @@ public class TableEnvironmentITCase extends TableProgramsTestBase { tableEnv.toDataSet(t, MyNonStatic.class); } + @Test(expected = TableException.class) + public void testCustomCalciteConfig() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + CalciteConfig cc = new CalciteConfigBuilder().replaceRuleSet(RuleSets.ofList()).build(); + tableEnv.getConfig().setCalciteConfig(cc); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + Table t = tableEnv.fromDataSet(ds); + tableEnv.toDataSet(t, Row.class); + } + // -------------------------------------------------------------------------------------------- public class MyNonStatic { http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala new file mode 100644 index 0000000..2b0d446 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule} +import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable} +import org.apache.calcite.tools.RuleSets +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.JavaConverters._ + +class CalciteConfigBuilderTest { + + @Test + def testDefaultRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .build() + + assertEquals(false, cc.replacesRuleSet) + assertFalse(cc.getRuleSet.isDefined) + } + + @Test + def testReplaceRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .build() + + assertEquals(true, cc.replacesRuleSet) + assertTrue(cc.getRuleSet.isDefined) + val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + } + + @Test + def testReplaceAddRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() + + assertEquals(true, cc.replacesRuleSet) + assertTrue(cc.getRuleSet.isDefined) + val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(3, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcSplitRule.INSTANCE)) + } + + @Test + def testAddRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .build() + + assertEquals(false, cc.replacesRuleSet) + assertTrue(cc.getRuleSet.isDefined) + val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + } + + @Test + def testAddAddRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() + + assertEquals(false, cc.replacesRuleSet) + assertTrue(cc.getRuleSet.isDefined) + val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(3, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcSplitRule.INSTANCE)) + } + + @Test + def testDefaultOperatorTable(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .build() + + assertEquals(false, cc.replacesSqlOperatorTable) + assertFalse(cc.getSqlOperatorTable.isDefined) + } + + def testReplaceOperatorTable(): Unit = { + + val oracleTable = new OracleSqlOperatorTable + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceSqlOperatorTable(oracleTable) + .build() + + val oracleOps = oracleTable.getOperatorList.asScala + + assertEquals(true, cc.replacesSqlOperatorTable) + assertTrue(cc.getSqlOperatorTable.isDefined) + val ops = cc.getSqlOperatorTable.get.getOperatorList + .asScala.toSet + assertEquals(oracleOps.size, ops.size) + for (o <- oracleOps) { + assertTrue(ops.contains(o)) + } + } + + def testReplaceAddOperatorTable(): Unit = { + + val oracleTable = new OracleSqlOperatorTable + val stdTable = new SqlStdOperatorTable + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceSqlOperatorTable(oracleTable) + .addSqlOperatorTable(stdTable) + .build() + + val oracleOps = oracleTable.getOperatorList.asScala + val stdOps = stdTable.getOperatorList.asScala + + assertEquals(true, cc.replacesSqlOperatorTable) + assertTrue(cc.getSqlOperatorTable.isDefined) + val ops = cc.getSqlOperatorTable.get.getOperatorList + .asScala.toSet + assertEquals(oracleOps.size + stdOps.size, ops.size) + for (o <- oracleOps) { + assertTrue(ops.contains(o)) + } + for (o <- stdOps) { + assertTrue(ops.contains(o)) + } + + } + + def testAddOperatorTable(): Unit = { + + val oracleTable = new OracleSqlOperatorTable + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addSqlOperatorTable(oracleTable) + .build() + + val oracleOps = oracleTable.getOperatorList.asScala + + assertEquals(false, cc.replacesSqlOperatorTable) + assertTrue(cc.getSqlOperatorTable.isDefined) + val ops = cc.getSqlOperatorTable.get.getOperatorList + .asScala.toSet + assertEquals(oracleOps.size, ops.size) + for (o <- oracleOps) { + assertTrue(ops.contains(o)) + } + } + + def testAddAddOperatorTable(): Unit = { + + val oracleTable = new OracleSqlOperatorTable + val stdTable = new SqlStdOperatorTable + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addSqlOperatorTable(oracleTable) + .addSqlOperatorTable(stdTable) + .build() + + val oracleOps = oracleTable.getOperatorList.asScala + val stdOps = stdTable.getOperatorList.asScala + + assertEquals(false, cc.replacesSqlOperatorTable) + assertTrue(cc.getSqlOperatorTable.isDefined) + val ops = cc.getSqlOperatorTable.get.getOperatorList + .asScala.toSet + assertEquals(oracleOps.size + stdOps.size, ops.size) + for (o <- oracleOps) { + assertTrue(ops.contains(o)) + } + for (o <- stdOps) { + assertTrue(ops.contains(o)) + } + + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala index 263696b..db86ef3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala @@ -18,10 +18,11 @@ package org.apache.flink.api.table +import org.apache.calcite.tools.RuleSet import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} import org.apache.flink.api.table.expressions.{Alias, UnresolvedFieldReference} import org.apache.flink.api.table.sinks.TableSink import org.junit.Test @@ -279,6 +280,8 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def checkValidTableName(name: String): Unit = ??? + override protected def getBuiltInRuleSet: RuleSet = ??? + override def sql(query: String): Table = ??? }
