[
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036251#comment-16036251
]
ASF GitHub Bot commented on FLINK-6073:
---------------------------------------
Github user rtudoran commented on a diff in the pull request:
https://github.com/apache/flink/pull/3609#discussion_r120007611
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream,
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window =>
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo,
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+ calc: LogicalJoin,
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputLeft: RelNode,
+ inputRight: RelNode,
+ rowType: RelDataType,
+ description: String)
+ extends BiRel(cluster, traitSet, inputLeft, inputRight) with
DataStreamRel {
+
+ override def deriveRowType(): RelDataType = rowType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataStreamJoin(
+ calc,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ rowType,
+ description + calc.getId())
+ }
+
+ override def toString: String = {
+ s"Join(${
+ if (!calc.getCondition.isAlwaysTrue()) {
+ s"condition: (${calc.getCondition}), "
+ } else {
+ ""
+ }
+ }left: ($inputLeft), right($inputRight))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("condition", calc.getCondition,
!calc.getCondition.isAlwaysTrue())
+ .item("join", calc)
+ .item("left", inputLeft)
+ .item("right", inputRight)
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment):
DataStream[Row] = {
+
+ val inputDSLeft =
inputLeft.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDSRight =
inputRight.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ //define the setup for various types of joins to be supported
+ (calc.getCondition.isAlwaysTrue(), calc.getJoinType) match {
+ case (true, JoinRelType.LEFT) =>
+ createInnerQueryJoin(inputDSLeft, inputDSRight)
+ case (_, _) =>
+ throw new TableException("Table does not support this type of
JOIN.")
+ }
+
+ null
+ }
+
+ def createInnerQueryJoin(
+ inputDSLeft: DataStream[Row], inputDSRight: DataStream[Row]):
DataStream[Row] = {
+
+ // get the output types
+ val rowTypeInfo =
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ val result = inputDSLeft.join(inputDSRight)
+ .where(new EmptyKeySelector()).equalTo(new EmptyKeySelector())
+ .window(GlobalWindows.create())
+ .trigger(new ProcTimeLeftJoinTrigger())
+ .evictor(new FullEvictor())
+ .apply(new JoinProcTimeForInnerQuerry(rowTypeInfo))
+
+ null
+ }
+
+}
+
+class EmptyKeySelector extends KeySelector[Row, Integer] {
+ override def getKey(value: Row): Integer = {
+ 0
+ }
+}
+
+class ProcTimeLeftJoinTrigger extends Trigger[Object, GlobalWindow] with
Serializable {
+
+ /*
+ * Check if element comes from the left stream case in which we should
fire
+ */
+ override def onElement(element: Object,
+ timestamp: Long,
+ window: GlobalWindow, ctx: TriggerContext): TriggerResult = {
+ element match {
+ case elementPair: TaggedUnion[Row, Row] => {
+ if (elementPair.isOne()) {
+ TriggerResult.FIRE_AND_PURGE
+ } else {
+ TriggerResult.CONTINUE
+ }
+ }
+ case _ => TriggerResult.CONTINUE
+ }
+ }
+
+ /*
+ * We operate on processing time so we move on each element
+ */
+ override def onProcessingTime(timestamp: Long, window:
+ GlobalWindow, ctx: TriggerContext): TriggerResult = {
+ TriggerResult.CONTINUE
+ }
+
+ /*
+ * We operate on processing time so we move on each element
+ */
+ override def onEventTime(timestamp: Long,
+ window: GlobalWindow,
+ ctx: TriggerContext): TriggerResult = {
+ TriggerResult.CONTINUE
+ }
+
+ override def clear(window: GlobalWindow, ctx: TriggerContext): Unit = {
+
+ }
+}
+
+class FullEvictor extends Evictor[Object, GlobalWindow] {
+ override def evictBefore(x1: Iterable[TimestampedValue[Object]],
+ size: Int, window: GlobalWindow, ctx: EvictorContext): Unit = {
+ }
+
+ override def evictAfter(x1: Iterable[TimestampedValue[Object]],
+ size: Int, window: GlobalWindow, ctx: EvictorContext): Unit = {
+ val iter = x1.iterator()
+ while (iter.hasNext()) {
+ iter.remove()
+ }
+ }
+}
+
+class JoinProcTimeForInnerQuerry(
+ private val rowTypeInfo: RowTypeInfo) extends
RichFlatJoinFunction[Row, Row, Row] {
+
+ private var lastValueRight: ValueState[Row] = _
+
+ override def open(configuration: Configuration): Unit = {
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("overState", rowTypeInfo)
+ lastValueRight = getRuntimeContext.getState(stateDescriptor)
+ }
+ override def join(first: Row, second: Row, out: Collector[Row]): Unit = {
+
+ var secondarity = 0
+ var secondR = second
+ if (second != null) {
+ lastValueRight.update(second)
+ secondarity = second.getArity
+ } else {
+ secondR = lastValueRight.value()
+ if (secondR != null) {
+ secondarity = secondR.getArity
+ }
+ }
+
+ if (first != null) {
+ val outrez = new Row(first.getArity + secondarity)
+ var i = 0
+ while (i < first.getArity) {
+ outrez.setField(i, first.getField(i))
+ i += 1
+ }
+ i = 0
+ while (i < secondarity) {
+ outrez.setField(i, secondR.getField(i))
--- End diff --
field needs to account for the values from left side - i + first.getArity
> Support for SQL inner queries for proctime
> ------------------------------------------
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: radu
> Assignee: radu
> Priority: Critical
> Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>
> Q1) `Select item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to
> combine in the main query results from an inner query.
> Q2) `Select s1.item, (Select a2 from table as t2 where table.id = s1.id
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with
> limit 1. This ensures the equivalency with the SingleElementAggregation used
> when translated the main target syntax for inner query. We must ensure that
> the 2 syntaxes are supported and implemented with the same functionality.
> There is the option also to select elements in the inner query from a table
> not just from a different stream. This should be a sub-JIRA issue implement
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function
> (left join with always true condition) between the output of the query on the
> main stream and the output of a single output aggregation operation on the
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
> LogicalSingleValue[type=aggregation]
> …logic of inner query (LogicalProject, LogicalScan…)
> …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special
> case operation rather than a proper join to be implemented between
> stream-to-stream. The implementation behavior should attach to the main
> stream output a value from a different query.
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder
> of the single value that results from the inner query. As this operator is
> the guarantee that the inner query will bring to the join no more than one
> value, there are several options on how to consider it’s functionality in the
> streaming context:
> 1. Throw an error if the inner query returns more than one result. This
> would be a typical behavior in the case of standard SQL over DB. However, it
> is very unlikely that a stream would only emit a single value. Therefore,
> such a behavior would be very limited for streams in the inner query.
> However, such a behavior might be more useful and common if the inner query
> is over a table.
> 1. We can interpret the usage of this parameter as the guarantee that at
> one moment only one value is selected. Therefore the behavior would rather be
> as a filter to select one value. This brings the option that the output of
> this operator evolves in time with the second stream that drives the inner
> query. The decision on when to evolve the stream should depend on what marks
> the evolution of the stream (processing time, watermarks/event time,
> ingestion time, window time partitions…).
> In this JIRA issue the evolution would be marked by the processing time. For
> this implementation the operator would work based on option 2. Hence at every
> moment the state of the operator that holds one value can evolve with the
> last elements. In this way the logic of the inner query is to select always
> the last element (fields, or other query related transformations based on the
> last value). This behavior is needed in many scenarios: (e.g., the typical
> problem of computing the total income, when incomes are in multiple
> currencies and the total needs to be computed in one currency by using always
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query
> example – Q3 (using inner query as the input source for FROM ). In such
> scenarios, the selection in the main query would need to be done based on
> latest elements. Therefore with such a behavior the 2 types of queries (Q1
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the
> inner query applied on 2 streams that operate on processing time.
> SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2
> ||Time||Stream1||Stream2||Output||
> |T1| | 1.2| |
> |T2|User1,10| | (10,1.2)|
> |T3|User2,11| | (11,1.2)|
> |T4| | 1.3| |
> |T5|User3,9 | | (9,1.3)|
> |...|
> Note 1. For streams that would operate on event time, at moment T3 we would
> need to retract the previous outputs ((10, 1.2), (11,1.2) ) and reemit them
> as ((10,1.3), (11,1.3) ).
> Note 2. Rather than failing when a new value comes in the inner query we just
> update the state that holds the single value. If option 1 for the behavior of
> LogicalSingleValue is chosen, than an error should be triggered at moment T3.
> **Implementation option**
> Considering the notes and the option for the behavior the operator would be
> implemented by using the join function of flink with a custom always true
> join condition and an inner selection for the output based on the incoming
> direction (to mimic the left join). The single value selection can be
> implemented over a statefull flat map. In case the join is executed in
> parallel by multiple operators, than we either use a parallelism of 1 for the
> statefull flatmap (option 1) or we broadcast the outputs of the flatmap to
> all join instances to ensure consistency of the results (option 2).
> Considering that the flatMap functionality of selecting one value is light,
> option 1 is better. The design schema is shown below.
> !innerquery.png!
> **General logic of Join**
> ```
> leftDataStream.join(rightDataStream)
> .where(new ConstantConditionSelector())
> .equalTo(new ConstantConditionSelector())
> .window(window.create())
> .trigger(new LeftFireTrigger())
> .evictor(new Evictor())
> .apply(JoinFunction());
> ```
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)