godfreyhe commented on a change in pull request #7993: [FLINK-11932] 
[table-planner-blink] Add support for generating optimized logical plan for 
'select * from mytable'
URL: https://github.com/apache/flink/pull/7993#discussion_r266284119
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
 ##########
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, 
FlinkChainedProgram, StreamOptimizeContext}
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.config.{CalciteConnectionConfig, 
CalciteConnectionConfigImpl, CalciteConnectionProperty}
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
+
+import java.util.Properties
+
+/**
+  * Builder for creating a Calcite configuration.
+  */
+class CalciteConfigBuilder {
+
+  /**
+    * Defines the optimize program for batch table plan.
+    */
+  private var batchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]] 
= None
+
+  /**
+    * Defines the optimize program for stream table plan.
+    */
+  private var streamProgram: 
Option[FlinkChainedProgram[StreamOptimizeContext]] = None
+
+  /**
+    * Defines the SQL operator tables.
+    */
+  private var replaceOperatorTable: Boolean = false
+  private var operatorTables: List[SqlOperatorTable] = Nil
+
+  /**
+    * Replaces the default batch table optimize program with the given program.
+    */
+  def replaceBatchProgram(
+      program: FlinkChainedProgram[BatchOptimizeContext]): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(program)
+    batchProgram = Some(program)
+    this
+  }
+
+  /**
+    * Replaces the default stream table optimize program with the given 
program.
+    */
+  def replaceStreamProgram(
+      program: FlinkChainedProgram[StreamOptimizeContext]): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(program)
+    streamProgram = Some(program)
+    this
+  }
+
+  /**
+    * Defines a SQL parser configuration.
+    */
+  private var replaceSqlParserConfig: Option[SqlParser.Config] = None
+
+  /**
+    * Defines a configuration for SqlToRelConverter.
+    */
+  private var replaceSqlToRelConverterConfig: Option[SqlToRelConverter.Config] 
= None
+
+  /**
+    * Replaces the built-in SQL operator table with the given table.
+    */
+  def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceSqlOperatorTable)
+    operatorTables = List(replaceSqlOperatorTable)
+    replaceOperatorTable = true
+    this
+  }
+
+  /**
+    * Appends the given table to the built-in SQL operator table.
+    */
+  def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedSqlOperatorTable)
+    this.operatorTables = addedSqlOperatorTable :: this.operatorTables
+    this
+  }
+
+  /**
+    * Replaces the built-in SQL parser configuration with the given 
configuration.
+    */
+  def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(sqlParserConfig)
+    replaceSqlParserConfig = Some(sqlParserConfig)
+    this
+  }
+
+  def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): 
CalciteConfigBuilder = {
+    Preconditions.checkNotNull(config)
+    replaceSqlToRelConverterConfig = Some(config)
+    this
+  }
+
+  private class CalciteConfigImpl(
+      val getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]],
+      val getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]],
+      val getSqlOperatorTable: Option[SqlOperatorTable],
+      val replacesSqlOperatorTable: Boolean,
+      val getSqlParserConfig: Option[SqlParser.Config],
+      val getSqlToRelConverterConfig: Option[SqlToRelConverter.Config])
+    extends CalciteConfig {
+
+  }
+
+  /**
+    * Builds a new [[CalciteConfig]].
+    */
+  def build(): CalciteConfig = new CalciteConfigImpl(
+    batchProgram,
+    streamProgram,
+    operatorTables match {
+      case Nil => None
+      case h :: Nil => Some(h)
+      case _ =>
+        // chain operator tables
+        Some(operatorTables.reduce((x, y) => ChainedSqlOperatorTable.of(x, y)))
+    },
+    this.replaceOperatorTable,
+    replaceSqlParserConfig,
+    replaceSqlToRelConverterConfig)
+}
+
+/**
+  * Calcite configuration for defining a custom Calcite configuration for 
Table and SQL API.
+  */
+trait CalciteConfig {
+
+  /**
+    * Returns a custom batch table optimize program
+    */
+  def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
+
+  /**
+    * Returns a custom stream table optimize program.
+    */
+  def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
+
+  /**
+    * Returns whether this configuration replaces the built-in SQL operator 
table.
+    */
+  def replacesSqlOperatorTable: Boolean
+
+  /**
+    * Returns a custom SQL operator table.
+    */
+  def getSqlOperatorTable: Option[SqlOperatorTable]
+
+  /**
+    * Returns a custom SQL parser configuration.
+    */
+  def getSqlParserConfig: Option[SqlParser.Config]
+
+  /**
+    * Returns a custom configuration for SqlToRelConverter.
+    */
+  def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
+}
+
+object CalciteConfig {
 
 Review comment:
   From my understanding, this class can be simplified as a simple class like 
`TableConfig`.  Currently the implementation style of `CalciteConfig` is same 
with Flink community. Flink community had deprecated this class because the API 
is uncoupled from Calcite. We can refactor this class after Flink community has 
finished this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to