dawidwys commented on a change in pull request #13413:
URL: https://github.com/apache/flink/pull/13413#discussion_r490915183



##########
File path: 
flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.basics;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Example for aggregating and ranking data using Flink SQL on updating (but 
bounded) streams.

Review comment:
       nit: Is the `(but bounded)` note necessary or important for the example? 
Just wondering if it helps users understand the use case/example or introduces 
unnecessary distraction/additional concept to understand.

##########
File path: 
flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.basics;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Example for aggregating and ranking data using Flink SQL on updating (but 
bounded) streams.
+ *
+ * <p>The example shows how to declare a table using SQL DDL for reading 
insert-only data and handling
+ * updating data. It should give a first impression about Flink SQL as a 
changelog processor. The example
+ * uses some streaming operations that produce a stream of updates. See the 
other examples for pure CDC
+ * processing and more complex operations.
+ *
+ * <p>In particular, the example shows how to
+ * <ul>
+ *     <li>setup a {@link TableEnvironment},
+ *     <li>use the environment for creating a CSV file with example data,
+ *     <li>aggregate the incoming INSERT changes,
+ *     <li>compute a top-N result,
+ *     <li>collect and materialize the updating stream locally.
+ * </ul>
+ *
+ * <p>The example executes two Flink jobs. The results are written to stdout.
+ *
+ * <p>Note: Make sure to include the SQL CSV format when submitting this 
example to Flink (e.g. via
+ * command line). This step is not necessary when executing the example in an 
IDE.
+ */
+public final class UpdatingTopCityExample {
+
+       public static void main(String[] args) throws Exception {
+               // prepare the session
+               final EnvironmentSettings settings = 
EnvironmentSettings.newInstance()
+                       .inStreamingMode()
+                       .build();
+               final TableEnvironment env = TableEnvironment.create(settings);
+
+               // create an empty temporary CSV directory for this example
+               final String populationDirPath = createTemporaryDirectory();
+
+               // register a table in the catalog that points to the CSV file
+               env.executeSql(
+                       "CREATE TABLE PopulationUpdates (" +
+                       "  city STRING," +
+                       "  state STRING," +
+                       "  update_year INT," +
+                       "  population_diff INT" +
+                       ") WITH (" +
+                       "  'connector' = 'filesystem'," +
+                       "  'path' = '" + populationDirPath + "'," +
+                       "  'format' = 'csv'" +
+                       ")"
+               );
+
+               // insert some example data into the table
+               final TableResult insertionResult = env.executeSql(
+                       "INSERT INTO PopulationUpdates VALUES" +
+                       "  ('Los Angeles', 'CA', 2013, 13106100), " +
+                       "  ('Los Angeles', 'CA', 2014, 72600), " +
+                       "  ('Los Angeles', 'CA', 2015, 72300), " +
+                       "  ('Chicago', 'IL', 2013, 9553270), " +
+                       "  ('Chicago', 'IL', 2014, 11340), " +
+                       "  ('Chicago', 'IL', 2015, -6730), " +
+                       "  ('Houston', 'TX', 2013, 6330660), " +
+                       "  ('Houston', 'TX', 2014, 172960), " +
+                       "  ('Houston', 'TX', 2015, 172940), " +
+                       "  ('Phoenix', 'AZ', 2013, 4404680), " +
+                       "  ('Phoenix', 'AZ', 2014, 86740), " +
+                       "  ('Phoenix', 'AZ', 2015, 89700), " +
+                       "  ('San Antonio', 'TX', 2013, 2280580), " +
+                       "  ('San Antonio', 'TX', 2014, 49180), " +
+                       "  ('San Antonio', 'TX', 2015, 50870), " +
+                       "  ('San Francisco', 'CA', 2013, 4521310), " +
+                       "  ('San Francisco', 'CA', 2014, 65940), " +
+                       "  ('San Francisco', 'CA', 2015, 62290), " +
+                       "  ('Dallas', 'TX', 2013, 6817520), " +
+                       "  ('Dallas', 'TX', 2014, 137740), " +
+                       "  ('Dallas', 'TX', 2015, 154020)"
+               );
+
+               // since all cluster operations of the Table API are executed 
asynchronously,
+               // we need wait until the insertion has been completed,

Review comment:
       ```suggestion
                // we need to wait until the insertion has been completed,
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *      :- LogicalAggregate(group=[{0}])
+ *      :  +- LogicalProject(state=[$1])
+ *      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *      +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ *         +- LogicalProject(name=[$0], pop=[$2])
+ *            +- LogicalFilter(condition=[=($1, $cor0.state)])
+ *               +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * <p>would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *    +- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *       +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3],
+ *            partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *          +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * <p>To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * <p>This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(
+  operand(classOf[Correlate],
+    operand(classOf[Aggregate],
+      operand(classOf[Project], any())),
+    operand(classOf[Sort],
+      operand(classOf[Project],
+        operand(classOf[Filter], any())))),
+  "CorrelateSortToRankRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val correlate: Correlate = call.rel(0)
+    if (correlate.getJoinType != JoinRelType.INNER) {
+      return false
+    }
+    val agg: Aggregate = call.rel(1)
+    if (agg.getAggCallList.size() > 0
+      || agg.getGroupSets.size() > 1
+      || agg.getGroupSet.cardinality() != 1) {
+      // only one group field is supported now
+      return false

Review comment:
       Maybe I am missing something (probably I am), but why do we need that 
limit?
   
   From the first glance it looks very tailor made for the single example.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
##########
@@ -1305,4 +1308,50 @@ class RankITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     assertEquals(expected2.sorted, sink2.getRetractResults.sorted)
   }
 
+  @Test
+  def testCorrelateSortToRank(): Unit = {
+    val citiesDataId = TestValuesTableFactory.registerData(TestData.citiesData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE cities (
+         |  name STRING NOT NULL,
+         |  state STRING NOT NULL,
+         |  pop INT NOT NULL
+         |) WITH (
+         | 'connector' = 'values',
+         | 'data-id' = '$citiesDataId',
+         | 'changelog-mode' = 'I'
+         |)
+         |""".stripMargin)
+
+    val query =
+      s"""
+         |SELECT state, name
+         |FROM
+         |  (SELECT DISTINCT state FROM cities) states,
+         |  LATERAL (
+         |    SELECT name, pop
+         |    FROM cities
+         |    WHERE state = states.state
+         |    ORDER BY pop
+         |    DESC LIMIT 3
+         |  )
+      """.stripMargin
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)

Review comment:
       nit: Can't we use Table#collect and assert lists of rows?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelCollations
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Filter, JoinRelType, 
Project, Sort}
+import org.apache.calcite.rex.{RexCall, RexCorrelVariable, RexFieldAccess, 
RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites sort correlation to a Rank.
+ * Typically, the following plan
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+ *      :- LogicalAggregate(group=[{0}])
+ *      :  +- LogicalProject(state=[$1])
+ *      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ *      +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3])
+ *         +- LogicalProject(name=[$0], pop=[$2])
+ *            +- LogicalFilter(condition=[=($1, $cor0.state)])
+ *               +- LogicalTableScan(table=[[default_catalog, 
default_database, cities]])
+ * }}}
+ *
+ * <p>would be transformed to
+ *
+ * {{{
+ *   LogicalProject(state=[$0], name=[$1])
+ *    +- LogicalProject(state=[$1], name=[$0], pop=[$2])
+ *       +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3],
+ *            partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, 
pop=$2])
+ *          +- LogicalTableScan(table=[[default_catalog, default_database, 
cities]])
+ * }}}
+ *
+ * <p>To match the Correlate, the LHS needs to be a global Aggregate on a 
scan, the RHS should
+ * be a Sort with an equal Filter predicate whose keys are same with the LHS 
grouping keys.
+ *
+ * <p>This rule can only be used in HepPlanner.
+ */
+class CorrelateSortToRankRule extends RelOptRule(

Review comment:
       nit: What about your own comment from 
https://github.com/apache/flink/pull/13291/files#r480880262 ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to