[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121690#comment-16121690
 ] 

ASF GitHub Bot commented on FLINK-6094:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132197433
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.plan.nodes.FlinkConventions
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
    +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.WindowJoinUtil
    +import scala.collection.JavaConverters._
    +
    +class DataStreamJoinRule
    +  extends ConverterRule(
    +    classOf[FlinkLogicalJoin],
    +    FlinkConventions.LOGICAL,
    +    FlinkConventions.DATASTREAM,
    +    "DataStreamJoinRule") {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
    +    val joinInfo = join.analyzeCondition
    +
    +    val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
    +      joinInfo.getRemaining(join.getCluster.getRexBuilder),
    +      join.getLeft.getRowType.getFieldCount,
    +      join.getRowType,
    +      join.getCluster.getRexBuilder,
    +      TableConfig.DEFAULT)
    +
    +    // remaining predicate must not access time attributes
    +    val remainingPredsAccessTime = remainingPreds.isDefined &&
    +      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, 
join.getRowType)
    +
    +    // Check that no event-time attributes are in the input.
    +    val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
    +      .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    if (!windowBounds.isDefined && !remainingPredsAccessTime && 
!rowTimeAttrInOutput) {
    --- End diff --
    
    I'm not sure if this is the right condition for this join implementation.
    I think we need to check that both inputs have a finite size, i.e., there 
is a bounded number of records in the table at a given point in time. This is 
important because we need to keep all active records of both inputs in state.
    A table which is derived from a stream can have a finite size if:
    
    - it is the result of a non-windowed aggregation (`GROUP BY user`)
    - it is derived by an update stream->table conversion
    - it is the tail of a stream (e.g., `WHERE rowtime > now() - INTERVAL '4' 
HOUR` or `ORDER BY rowtime DESC FETCH 100 ROWS ONLY`)
    - there might be more cases.
    
    The windowed join is different in that is can process unbounded tables 
because it can limit the valid data ranges with the window boundaries. So, just 
because a join cannot be computed with the windowed join does not automatically 
mean we can process it with this implementation.
    
    Right now, only non-windowed aggregation is supported and hence the only 
way to obtain a finite table from a stream. 
    
    In `DataSetSingleRowJoinRule.isSingleRow()` we check if an input rel 
produces only a single row by tracking back the input. I think we need similar 
logic here to ensure that the input relation is of finite size (in our case has 
a non-windowed `Aggregate`).


> Implement stream-stream proctime non-window  inner join
> -------------------------------------------------------
>
>                 Key: FLINK-6094
>                 URL: https://issues.apache.org/jira/browse/FLINK-6094
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to