[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010856#comment-16010856
 ] 

ASF GitHub Bot commented on FLINK-6075:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3889#discussion_r116521149
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.RelCollation
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.AggregateFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import java.util.{ List => JList, ArrayList }
    +import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import java.sql.Timestamp
    +import org.apache.calcite.rel.RelFieldCollation
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +import java.util.Comparator
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
    +import java.math.{BigDecimal=>JBigDecimal}
    +import org.apache.flink.api.common.functions.MapFunction
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.calcite.rex.{RexLiteral, RexNode}
    +import org.apache.flink.api.common.ExecutionConfig
    +import org.apache.flink.api.common.typeinfo.AtomicType
    +import org.apache.flink.api.java.typeutils.runtime.RowComparator
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    + * Class represents a collection of helper methods to build the sort logic.
    + * It encapsulates as well the implementation for ordering and generic 
interfaces
    + */
    +
    +object SortUtil {
    +
    +  
    +  /**
    +   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
    +   * elements based on rowtime and potentially other fields
    +   * @param collationSort The Sort collation list
    +   * @param inputType input row type
    +   * @param execCfg table environment execution configuration
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createRowTimeSortFunction(
    +    collationSort: RelCollation,
    +    inputType: RelDataType,
    +    inputTypeInfo: TypeInformation[Row],
    +    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
    +
    +    val keySortFields = getSortFieldIndexList(collationSort)
    +    val keySortDirections = getSortFieldDirectionList(collationSort)
    +
    +       //drop time from comparison as we sort on time in the states and 
result emission
    +    val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
    +    val keyDirectionsNoTime = keySortDirections.slice(1, 
keySortDirections.size)
    +    val booleanOrderings = getSortFieldDirectionBooleanList(collationSort)
    +    val booleanDirectionsNoTime = booleanOrderings.slice(1, 
booleanOrderings.size)
    +    
    +    val fieldComps = createFieldComparators(inputType, 
    +        keyIndexesNoTime, keyDirectionsNoTime, execCfg)
    +    val fieldCompsRefs = 
fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]]
    +    
    +    val rowComp = createRowComparator(inputType,
    +        keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime)
    +    val collectionRowComparator = new CollectionRowComparator(rowComp)
    +    
    +    val inputCRowType = CRowTypeInfo(inputTypeInfo)
    + 
    +    new RowTimeSortProcessFunction(
    +      inputType.getFieldCount,
    +      inputCRowType,
    +      collectionRowComparator)
    +
    +  }
    +  
    +  
    +  /**
    +   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
    +   * elements based on proctime and potentially other fields
    +   * @param collationSort The Sort collation list
    +   * @param inputType input row type
    +   * @param execCfg table environment execution configuration
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createProcTimeSortFunction(
    +    collationSort: RelCollation,
    +    inputType: RelDataType,
    +    inputTypeInfo: TypeInformation[Row],
    +    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
    +
    +    val keySortFields = getSortFieldIndexList(collationSort)
    +    val keySortDirections = getSortFieldDirectionList(collationSort)
    +
    +    
    +       //drop time from comparison
    +    val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
    +    val keyDirectionsNoTime = keySortDirections.slice(1, 
keySortDirections.size)
    +    val booleanOrderings = getSortFieldDirectionBooleanList(collationSort)
    +    val booleanDirectionsNoTime = booleanOrderings.slice(1, 
booleanOrderings.size)
    +    
    +    val fieldComps = createFieldComparators(inputType, 
    +        keyIndexesNoTime, keyDirectionsNoTime, execCfg)
    +    val fieldCompsRefs = 
fieldComps.asInstanceOf[Array[TypeComparator[AnyRef]]]
    +    
    +    val rowComp = createRowComparator(inputType, 
    +        keyIndexesNoTime, fieldCompsRefs, booleanDirectionsNoTime)
    +    val collectionRowComparator = new CollectionRowComparator(rowComp)
    +    
    +    val inputCRowType = CRowTypeInfo(inputTypeInfo)
    +    
    +    new ProcTimeSortProcessFunction(
    +      inputType.getFieldCount,
    +      inputCRowType,
    +      collectionRowComparator)
    +
    +  }
    +
    +  
    +   /**
    +   * Function creates comparison objects based on the field types
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
    +   * @param orderDirection the directions of each sort field. 
    +   * @param execConfig the configuration environment
    +   * @return Array of TypeComparator objects
    +   */
    +  def createFieldComparators(
    +      inputType: RelDataType,
    +      keyIndex: Array[Int],
    +      orderDirection: Array[Direction],
    +      execConfig: ExecutionConfig): Array[TypeComparator[_]] = {
    +    
    +    var iOrder = 0
    +    for (i <- keyIndex) yield {
    +
    +      val order = if (orderDirection(iOrder) == Direction.ASCENDING) true 
else false
    +      iOrder += 1
    +      val fieldTypeInfo = 
FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(i).getType)
    +      fieldTypeInfo match {
    +        case a: AtomicType[_] => a.createComparator(order, execConfig)
    +        case _ => throw new TableException(s"Unsupported field type 
$fieldTypeInfo to sort on.")
    +      }
    +    }
    +  }
    +  
    +   /**
    +   * Function creates a RowComparator based on the typed comparators
    +   * @param inputRowType input row type
    +   * @param fieldIdxs the indexes of the fields on which the sorting is 
done. 
    +   * @param fieldComps the array of typed comparators
    +   * @param fieldOrders the directions of each sort field (true = ASC). 
    +   * @return A Row TypeComparator object 
    +   */
    +  def createRowComparator(
    +    inputRowType: RelDataType,
    +    fieldIdxs: Array[Int],
    +    fieldComps: Array[TypeComparator[AnyRef]],
    +    fieldOrders: Array[Boolean]): TypeComparator[Row] = {
    +
    +  val rowComp = new RowComparator(
    +    inputRowType.getFieldCount,
    +    fieldIdxs,
    +    fieldComps,
    +    new Array[TypeSerializer[AnyRef]](0), //used only for serialized 
comparisons
    +    fieldOrders)
    +
    +  rowComp
    +}
    +  
    + 
    +  /**
    +   * Function returns the array of indexes for the fields on which the 
sort is done
    +   * @param collationSort The Sort collation list
    +   * @return [Array[Int]]
    +   */
    +  def getSortFieldIndexList(collationSort: RelCollation): Array[Int] = {
    +    val keyFields = collationSort.getFieldCollations.toArray()
    +    for (f <- keyFields) yield 
f.asInstanceOf[RelFieldCollation].getFieldIndex
    +  }
    +  
    +   /**
    +   * Function returns the array of sort direction for each of the sort 
fields 
    +   * @param collationSort The Sort collation list
    +   * @return [Array[Direction]]
    +   */
    +  def getSortFieldDirectionList(collationSort: RelCollation): 
Array[Direction] = {
    --- End diff --
    
    Is not needed. We can use `getSortFieldDirectionBooleanList` instead


> Support Limit/Top(Sort) for Stream SQL
> --------------------------------------
>
>                 Key: FLINK-6075
>                 URL: https://issues.apache.org/jira/browse/FLINK-6075
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>              Labels: features
>         Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||      Stream1||       Limit 2||       Top 2|| Sort 
> [ASC]||
> |         |10:00:00  |(aaa, 11)       |               |             |         
>    |
> |         |10:05:00    |(aab, 7)  |           |             |            |
> |10-11          |11:00:00  |          |       aab,aaa |aab,aaa  |     aab,aaa 
>    |
> |         |11:03:00  |(aac,21)  |           |         |            |          
>         
> |11-12    |12:00:00  |          |     aab,aaa |aab,aaa  |     aab,aaa,aac|
> |         |12:10:00  |(abb,12)  |           |         |            |          
>         
> |         |12:15:00  |(abb,12)  |           |         |            |          
>         
> |12-13          |13:00:00  |          |       abb,abb | abb,abb |     
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elements as they arrive, one at a time followed by a flatMap to filter 
> the number of outputs. 
> !sort.png!
> **General logic of Join**
> ```
> inputDataStream.window(new [Slide/Tumble][Time/Count]Window())
> //.trigger(new [Time/Count]Trigger()) – use default
> //.evictor(new [Time/Count]Evictor()) – use default
>               .apply(SortAndFilter());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to