Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r152753104
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.plan.nodes.FlinkConventions
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
    +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
    +import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
    +import org.apache.flink.table.runtime.join.WindowJoinUtil
    +
    +import scala.collection.JavaConverters._
    +
    +class DataStreamJoinRule
    +  extends ConverterRule(
    +    classOf[FlinkLogicalJoin],
    +    FlinkConventions.LOGICAL,
    +    FlinkConventions.DATASTREAM,
    +    "DataStreamJoinRule") {
    +
    +  /**
    +    * Checks if an expression accesses a time attribute.
    +    *
    +    * @param expr The expression to check.
    +    * @param inputType The input type of the expression.
    +    * @return True, if the expression accesses a time attribute. False 
otherwise.
    +    */
    +  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): 
Boolean = {
    +    expr match {
    +      case i: RexInputRef =>
    +        val accessedType = inputType.getFieldList.get(i.getIndex).getType
    +        accessedType match {
    +          case _: TimeIndicatorRelDataType => true
    +          case _ => false
    +        }
    +      case c: RexCall =>
    +        c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
    +      case _ => false
    +    }
    +  }
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
    +    val joinInfo = join.analyzeCondition
    +
    +    val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
    +      joinInfo.getRemaining(join.getCluster.getRexBuilder),
    +      join.getLeft.getRowType.getFieldCount,
    +      join.getRowType,
    +      join.getCluster.getRexBuilder,
    +      TableConfig.DEFAULT)
    +
    +    // remaining predicate must not access time attributes
    +    val remainingPredsAccessTime = remainingPreds.isDefined &&
    +      accessesTimeAttribute(remainingPreds.get, join.getRowType)
    +
    +    !windowBounds.isDefined && !remainingPredsAccessTime
    --- End diff --
    
    Replace with `windowBounds.isEmpty`


---

Reply via email to