Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132197433
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.plan.nodes.FlinkConventions
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
    +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.WindowJoinUtil
    +import scala.collection.JavaConverters._
    +
    +class DataStreamJoinRule
    +  extends ConverterRule(
    +    classOf[FlinkLogicalJoin],
    +    FlinkConventions.LOGICAL,
    +    FlinkConventions.DATASTREAM,
    +    "DataStreamJoinRule") {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
    +    val joinInfo = join.analyzeCondition
    +
    +    val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
    +      joinInfo.getRemaining(join.getCluster.getRexBuilder),
    +      join.getLeft.getRowType.getFieldCount,
    +      join.getRowType,
    +      join.getCluster.getRexBuilder,
    +      TableConfig.DEFAULT)
    +
    +    // remaining predicate must not access time attributes
    +    val remainingPredsAccessTime = remainingPreds.isDefined &&
    +      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, 
join.getRowType)
    +
    +    // Check that no event-time attributes are in the input.
    +    val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
    +      .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    if (!windowBounds.isDefined && !remainingPredsAccessTime && 
!rowTimeAttrInOutput) {
    --- End diff --
    
    I'm not sure if this is the right condition for this join implementation.
    I think we need to check that both inputs have a finite size, i.e., there 
is a bounded number of records in the table at a given point in time. This is 
important because we need to keep all active records of both inputs in state.
    A table which is derived from a stream can have a finite size if:
    
    - it is the result of a non-windowed aggregation (`GROUP BY user`)
    - it is derived by an update stream->table conversion
    - it is the tail of a stream (e.g., `WHERE rowtime > now() - INTERVAL '4' 
HOUR` or `ORDER BY rowtime DESC FETCH 100 ROWS ONLY`)
    - there might be more cases.
    
    The windowed join is different in that is can process unbounded tables 
because it can limit the valid data ranges with the window boundaries. So, just 
because a join cannot be computed with the windowed join does not automatically 
mean we can process it with this implementation.
    
    Right now, only non-windowed aggregation is supported and hence the only 
way to obtain a finite table from a stream. 
    
    In `DataSetSingleRowJoinRule.isSingleRow()` we check if an input rel 
produces only a single row by tracking back the input. I think we need similar 
logic here to ensure that the input relation is of finite size (in our case has 
a non-windowed `Aggregate`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to