Repository: flink Updated Branches: refs/heads/release-1.1 9c87f92cb -> 5731672e5
[FLINK-4581] [table] Fix Table API throwing "No suitable driver found for jdbc:calcite" Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5731672e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5731672e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5731672e Branch: refs/heads/release-1.1 Commit: 5731672e5f7134d1a9ab01e38b7a861d1ebf0a68 Parents: 9c87f92 Author: twalthr <[email protected]> Authored: Fri Sep 16 11:41:15 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Sat Oct 22 10:15:04 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/FlinkRelBuilder.scala | 36 +++++++------------- 1 file changed, 12 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5731672e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala index e3bb97e..c759deb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -18,24 +18,28 @@ package org.apache.flink.api.table +import java.util.Collections + import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema} -import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rex.RexBuilder import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.tools.Frameworks.PlannerAction -import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.VolcanoPlanner +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rex.RexBuilder +import org.apache.calcite.tools.{FrameworkConfig, RelBuilder} /** * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. */ class FlinkRelBuilder( context: Context, - cluster: RelOptCluster, + relOptCluster: RelOptCluster, relOptSchema: RelOptSchema) extends RelBuilder( context, - cluster, + relOptCluster, relOptSchema) { def getPlanner = cluster.getPlanner @@ -49,36 +53,20 @@ class FlinkRelBuilder( object FlinkRelBuilder { def create(config: FrameworkConfig): FlinkRelBuilder = { - // prepare planner and collect context instances - val clusters: Array[RelOptCluster] = Array(null) - val relOptSchemas: Array[RelOptSchema] = Array(null) - val rootSchemas: Array[SchemaPlus] = Array(null) - Frameworks.withPlanner(new PlannerAction[Void] { - override def apply( - cluster: RelOptCluster, - relOptSchema: RelOptSchema, - rootSchema: SchemaPlus) - : Void = { - clusters(0) = cluster - relOptSchemas(0) = relOptSchema - rootSchemas(0) = rootSchema - null - } - }) - val planner = clusters(0).getPlanner - val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader] // create Flink type factory val typeSystem = config.getTypeSystem val typeFactory = new FlinkTypeFactory(typeSystem) // create context instances with Flink type factory + val planner = new VolcanoPlanner(Contexts.empty()) + planner.addRelTraitDef(ConventionTraitDef.INSTANCE) val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) val calciteSchema = CalciteSchema.from(config.getDefaultSchema) val relOptSchema = new CalciteCatalogReader( calciteSchema, config.getParserConfig.caseSensitive(), - defaultRelOptSchema.getSchemaName, + Collections.emptyList(), typeFactory) new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
