[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121708#comment-16121708
 ] 

ASF GitHub Bot commented on FLINK-6094:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132300742
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeNonWindowInnerJoin.scala
 ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.flink.table.runtime.join
    +
    +import org.apache.flink.api.common.functions.RichFlatJoinFunction
    +import org.apache.flink.api.common.state._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
TupleTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +
    +/**
    +  * Connect data for left stream and right stream. Only use for innerJoin.
    +  *
    +  * @param joiner           join function
    +  * @param leftType         the input type of left stream
    +  * @param rightType        the input type of right stream
    +  * @param resultType       the output type of join
    +  * @param queryConfig
    +  */
    +class ProcTimeNonWindowInnerJoin(
    +    joiner: RichFlatJoinFunction[Row, Row, Row],
    +    leftType: TypeInformation[Row],
    +    rightType: TypeInformation[Row],
    +    resultType: TypeInformation[CRow],
    +    queryConfig: StreamQueryConfig) extends
    +  CoProcessFunction[CRow, CRow, CRow] with ResultTypeQueryable[CRow] {
    +
    +
    +  // state to hold left stream element
    +  private var leftState: MapState[Row, JTuple2[Int, Long]] = null
    +  // state to hold right stream element
    +  private var rightState: MapState[Row, JTuple2[Int, Long]] = null
    +  private var cRowWrapper: CRowWrappingMultiOuputCollector = null
    +
    +  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
    +  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
    +  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
    +
    +  // state to record last timer of left stream, 0 means no timer
    +  private var timerState1: ValueState[Long] = _
    +  // state to record last timer of right stream, 0 means no timer
    +  private var timerState2: ValueState[Long] = _
    +
    +
    +  override def open(parameters: Configuration): Unit = {
    +    // initialize left and right state
    +    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
    +    val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
    +      "left", leftType, tupleTypeInfo)
    +    val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
    +      "right", rightType, tupleTypeInfo)
    +    leftState = getRuntimeContext.getMapState(leftStateDescriptor)
    +    rightState = getRuntimeContext.getMapState(rightStateDescriptor)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1 = new 
ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    +    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
    +    val valueStateDescriptor2 = new 
ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    +    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
    +
    +    cRowWrapper = new CRowWrappingMultiOuputCollector()
    +    joiner.setRuntimeContext(getRuntimeContext)
    +    joiner.open(parameters)
    +  }
    +
    +  /**
    +    * Process left stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement1(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(valueC, ctx, out, timerState1, leftState, rightState, 
true)
    +  }
    +
    +  /**
    +    * Process right stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement2(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(valueC, ctx, out, timerState2, rightState, leftState, 
false)
    +  }
    +
    +
    +  /**
    +    * Called when a processing timer trigger.
    +    * Expire left/right records which are expired in left and right state.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param ctx       The ctx to register timer or get current time
    +    * @param out       The collector for returning result values.
    +    */
    +  override def onTimer(
    +      timestamp: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +      out: Collector[CRow]): Unit = {
    +
    +    if (stateCleaningEnabled && timerState1.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftState,
    +        timerState1,
    +        ctx
    +      )
    +    }
    +
    +    if (stateCleaningEnabled && timerState2.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightState,
    +        timerState2,
    +        ctx
    +      )
    +    }
    +  }
    +
    +
    +  def getNewExpiredTime(
    +      curProcessTime: Long,
    +      oldExpiredTime: Long): Long = {
    +
    +    var newExpiredTime = oldExpiredTime
    +    if (stateCleaningEnabled) {
    +      if (-1 == oldExpiredTime || (curProcessTime + minRetentionTime) > 
oldExpiredTime) {
    +        newExpiredTime = curProcessTime + maxRetentionTime
    +      }
    +    }
    +    newExpiredTime
    +  }
    +
    +  /**
    +    * Puts or Retract an element from the input stream into state and 
search the other state to
    +    * output records meet the condition. Records will be expired in state 
if state retention time
    +    * has been specified.
    +    */
    +  def processElement(
    +      value: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      timerState: ValueState[Long],
    +      currentSideState: MapState[Row, JTuple2[Int, Long]],
    +      otherSideState: MapState[Row, JTuple2[Int, Long]],
    +      isLeft: Boolean): Unit = {
    +
    +    cRowWrapper.out = out
    +    cRowWrapper.setChange(value.change)
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    var oldCnt = 0
    +    var oldExpiredTime: Long = -1
    +
    +    val currentRowCntAndExpiredTime = currentSideState.get(value.row)
    +    if (currentRowCntAndExpiredTime != null) {
    +      oldCnt = currentRowCntAndExpiredTime.f0
    +      oldExpiredTime = currentRowCntAndExpiredTime.f1
    +    }
    +
    +    val newExpiredTime = getNewExpiredTime(curProcessTime, oldExpiredTime)
    +    if (stateCleaningEnabled && timerState.value() == 0) {
    +      timerState.update(newExpiredTime)
    +      ctx.timerService().registerProcessingTimeTimer(newExpiredTime)
    +    }
    +
    +    // update current side stream state
    +    if (!value.asInstanceOf[CRow].change) {
    +      oldCnt = oldCnt - 1
    +      if (oldCnt <= 0) {
    +        currentSideState.remove(value.row)
    +      } else {
    +        currentSideState.put(value.row, JTuple2.of(oldCnt, newExpiredTime))
    +      }
    +    } else {
    +      oldCnt = oldCnt + 1
    --- End diff --
    
    -> `val newCnt = oldCnt + 1`


> Implement stream-stream proctime non-window  inner join
> -------------------------------------------------------
>
>                 Key: FLINK-6094
>                 URL: https://issues.apache.org/jira/browse/FLINK-6094
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to