Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3590#discussion_r107604045
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
    @@ -119,6 +150,57 @@ class DataStreamOverAggregate(
     
       }
     
    +  def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): 
DataStream[Row] = {
    +
    +    val overWindow: Group = logicWindow.groups.get(0)
    +    val partitionKeys: Array[Int] = overWindow.keys.toArray
    +    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
    +
    +    val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
    +    val count = input.getRowType().getFieldCount()
    +    val lowerboundIndex = index - count
    +    
    +    
    +    val time_boundary = 
logicWindow.constants.get(lowerboundIndex).getValue2 match {
    +      case _: java.math.BigDecimal => 
logicWindow.constants.get(lowerboundIndex)
    +         .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
    +      case _ => throw new TableException("OVER Window boundaries must be 
numeric")
    +    }
    +
    +     // get the output types
    +    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    +         
    +    val result: DataStream[Row] =
    +        // partitioned aggregation
    +        if (partitionKeys.nonEmpty) {
    +          
    +          val processFunction = 
AggregateUtil.CreateTimeBoundedProcessingOverProcessFunction(
    +            namedAggregates,
    +            inputType,
    +            time_boundary)
    +          
    +          inputDS
    +          .keyBy(partitionKeys: _*)
    +          .process(processFunction)
    +          .returns(rowTypeInfo)
    +          .name(aggOpName)
    +          .asInstanceOf[DataStream[Row]]
    +        } else { // non-partitioned aggregation
    +          val processFunction = 
AggregateUtil.CreateTimeBoundedProcessingOverProcessFunction(
    --- End diff --
    
    @sunjincheng121 Thanks for the suggestion. As i mentioned bellow - i do not 
think using the MapState is a good option because we need to go through all the 
elements to remove and retract from the accumulator. That is O(n) complexity. 
This really makes it equivalent with the window based implementation. 
    However, what i meant is that we should use
    Queue[JTuple2[Long,Row]]...we can put this either in a ValueState or any 
other form of a state....it does not matter. Than we do the operations over the 
contents directly on this.
    I would suggest also for your implementation to use this approach as it can 
be more efficient to keep the events timely sorted. Perhaps a Queue would not 
work for you but than you should use a (Double)LinkedList to keep the events 
sorted by the event time
    @fhueske 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to