Repository: flink Updated Branches: refs/heads/master ffb056072 -> 86d32ac84
[FLINK-6037] [table] Metadata provider didn't work in SQL This closes #3559. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86d32ac8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86d32ac8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86d32ac8 Branch: refs/heads/master Commit: 86d32ac847d3f5663f350210cecc70205b9fa0b9 Parents: ffb0560 Author: jingzhang <[email protected]> Authored: Sun Mar 19 15:29:41 2017 +0800 Committer: twalthr <[email protected]> Committed: Tue Mar 21 11:51:45 2017 +0100 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkPlannerImpl.scala | 5 ++- .../flink/table/calcite/FlinkRelBuilder.scala | 7 +--- .../calcite/FlinkRelOptClusterFactory.scala | 42 ++++++++++++++++++++ 3 files changed, 46 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86d32ac8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index e08d313..4f3e317 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -36,6 +36,7 @@ import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException} import org.apache.flink.table.calcite.sql2rel.FlinkRelDecorrelator +import org.apache.flink.table.plan.cost.FlinkDefaultRelMetadataProvider import scala.collection.JavaConversions._ @@ -99,7 +100,7 @@ class FlinkPlannerImpl( try { assert(validatedSqlNode != null) val rexBuilder: RexBuilder = createRexBuilder - val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder) val config = SqlToRelConverter.configBuilder() .withTrimUnusedFields(false).withConvertTableAccess(false).build() val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( @@ -140,7 +141,7 @@ class FlinkPlannerImpl( validator.setIdentifierExpansion(true) val validatedSqlNode: SqlNode = validator.validate(sqlNode) val rexBuilder: RexBuilder = createRexBuilder - val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder) val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder .withTrimUnusedFields(false).withConvertTableAccess(false).build val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( http://git-wip-us.apache.org/repos/asf/flink/blob/86d32ac8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index aabf4c9..6430ce0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -84,12 +84,7 @@ object FlinkRelBuilder { val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty()) planner.setExecutor(config.getExecutor) planner.addRelTraitDef(ConventionTraitDef.INSTANCE) - val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) - cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE) - // just set metadataProvider is not enough, see - // https://www.mail-archive.com/[email protected]/msg00930.html - RelMetadataQuery.THREAD_PROVIDERS.set( - JaninoRelMetadataProvider.of(cluster.getMetadataProvider)) + val cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory)) val calciteSchema = CalciteSchema.from(config.getDefaultSchema) val relOptSchema = new CalciteCatalogReader( calciteSchema, http://git-wip-us.apache.org/repos/asf/flink/blob/86d32ac8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelOptClusterFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelOptClusterFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelOptClusterFactory.scala new file mode 100644 index 0000000..24fdb9e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelOptClusterFactory.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner} +import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQuery} +import org.apache.calcite.rex.RexBuilder +import org.apache.flink.table.plan.cost.FlinkDefaultRelMetadataProvider + +/** + * The utility class is to create special [[RelOptCluster]] instance which use + * [[FlinkDefaultRelMetadataProvider]] instead of [[DefaultRelMetadataProvider]]. + */ +object FlinkRelOptClusterFactory { + + def create(planner: RelOptPlanner, rexBuilder: RexBuilder): RelOptCluster = { + val cluster = RelOptCluster.create(planner, rexBuilder) + cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE) + // just set metadataProvider is not enough, see + // https://www.mail-archive.com/[email protected]/msg00930.html + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(cluster.getMetadataProvider)) + cluster + } + +}
