[
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974863#comment-15974863
]
ASF GitHub Bot commented on FLINK-6075:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3714#discussion_r112208301
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream,
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window =>
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo,
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+ * Flink RelNode which matches along with Sort Rule.
+ *
+ */
+class DataStreamSort(
+ sort: LogicalSort,
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ description: String)
+ extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+ override def deriveRowType(): RelDataType = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataStreamSort(
+ sort,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ rowRelDataType,
+ inputType,
+ description + sort.getId())
+ }
+
+ override def toString: String = {
+ s"Sort($sort)" +
+ " on fields: (${sort.collation.getFieldCollations})"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("aggregate", sort)
+ .item("sort fields",sort.collation.getFieldCollations)
+ .itemIf("offset", sort.offset, sort.offset!=null)
+ .itemIf("fetch", sort.fetch, sort.fetch!=null)
+ .item("input", inputNode)
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment):
DataStream[Row] = {
+
+ val inputDS =
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ //need to identify time between others order fields. Time needs to be
first sort element
+ val timeType = SortUtil.getTimeType(sort,inputType)
+
+ //time ordering needs to be ascending
+ if (SortUtil.getTimeDirection(sort) != Direction.ASCENDING) {
+ throw new TableException("SQL/Table supports only ascending time
ordering")
+ }
+
+
+ val (offset,fetch) = (sort.offset,sort.fetch)
+
+ //enable to extend for other types of aggregates that will not be
implemented in a window
+ timeType match {
+ case _: ProcTimeType =>
+ (offset,fetch) match {
+ case (o:Any,f:Any) => null // offset and fetch
needs retraction
+ case (_,f:Any) => null // offset needs
retraction
+ case (o:Any,_) => null // fetch needs
retraction
+ case _ => createSortProcTime(inputDS) //sort can be done
with/without retraction
+ }
+ case _: RowTimeType =>
+ throw new TableException("SQL/Table does not support sort on row
time")
+ case _ =>
+ throw new TableException("SQL/Table needs to have sort on time
as first sort element")
+ }
+
+ }
+
+ /**
+ * Create Sort logic based on processing time
+ */
+ def createSortProcTime(
+ inputDS: DataStream[Row]): DataStream[Row] = {
+
+
+ // get the output types
+ //Sort does not do project.= Hence it will output also the ordering
proctime field
+ //[TODO]Do we need to drop some of the ordering fields? (implement a
projection logic?
--- End diff --
No, Sort is preserving the data type. Fields would be dropped by a
preceding or following Calc.
> Support Limit/Top(Sort) for Stream SQL
> --------------------------------------
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: radu
> Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the
> same only the processing function differs in terms of the output. Hence, the
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL
> '3' HOUR) ORDER BY b`
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING
> LIMIT 10) FROM stream1`
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections
> of data).
> -Each of the 3 operators will be supported with each of the types of
> expressing the windows.
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all
> require a sorted collection of the data on which the logic will be applied
> (i.e., select a subset of the items or the entire sorted set). These
> functions would make sense in the streaming context only in the context of a
> window. Without defining a window the functions could never emit as the sort
> operation would never trigger. If an SQL query will be provided without
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR).
> Although not targeted by this JIRA, in the case of working based on event
> time order, the retraction mechanisms of windows and the lateness mechanisms
> can be used to deal with out of order events and retraction/updates of
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting,
> limit and top). Rowtime indicates when the HOP window will trigger – which
> can be observed in the fact that outputs are generated only at those moments.
> The HOP windows will trigger at every hour (fixed hour) and each event will
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime
> indicates the processing time when a new event arrives in the system. Events
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort
> [ASC]||
> | |10:00:00 |(aaa, 11) | | |
> |
> | |10:05:00 |(aab, 7) | | | |
> |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa
> |
> | |11:03:00 |(aac,21) | | | |
>
> |11-12 |12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac|
> | |12:10:00 |(abb,12) | | | |
>
> | |12:15:00 |(abb,12) | | | |
>
> |12-13 |13:00:00 | | abb,abb | abb,abb |
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries,
> the functionality will be implemented within the logic of the window as
> follows.
> * Window assigner – selected based on the type of window used in SQL
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger
> (based on LIMIT/TOP parameters). All data will be sorted at once and result
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort
> the elements as they arrive, one at a time followed by a flatMap to filter
> the number of outputs.
> !sort.png!
> **General logic of Join**
> ```
> inputDataStream.window(new [Slide/Tumble][Time/Count]Window())
> //.trigger(new [Time/Count]Trigger()) – use default
> //.evictor(new [Time/Count]Evictor()) – use default
> .apply(SortAndFilter());
> ```
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)