[ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972466#comment-15972466
 ] 

ASF GitHub Bot commented on FLINK-6149:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3594#discussion_r111918921
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
    @@ -0,0 +1,109 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.rel.logical
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rel.logical.LogicalTableScan
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelNode, RelWriter}
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.rel.FlinkConventions
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConverters._
    +
    +class FlinkLogicalTableSourceScan(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    table: RelOptTable,
    +    val tableSource: TableSource[_])
    +  extends TableScan(cluster, traitSet, table)
    +  with FlinkLogicalRel {
    +
    +  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
    +    new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
    +  }
    +
    +  override def deriveRowType(): RelDataType = {
    +    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildRowDataType(
    +      TableEnvironment.getFieldNames(tableSource),
    +      TableEnvironment.getFieldTypes(tableSource.getReturnType))
    +  }
    +
    +  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
    +    val rowCnt = metadata.getRowCount(this)
    +    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    I see, +1


> add additional flink logical relation nodes
> -------------------------------------------
>
>                 Key: FLINK-6149
>                 URL: https://issues.apache.org/jira/browse/FLINK-6149
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kurt Young
>            Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to