godfreyhe commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r555526415



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -121,16 +121,24 @@ public StreamExecRank(
         int[] sortFields = sortSpec.getFieldIndices();
         RowDataKeySelector sortKeySelector =
                 KeySelectorUtil.getRowDataSelector(sortFields, 
inputRowTypeInfo);
+        // create a sort spec on sort keys.
         int[] sortKeyPositions = IntStream.range(0, 
sortFields.length).toArray();
+        SortSpec.SortSpecBuilder builder = SortSpec.builder();
+        IntStream.range(0, sortFields.length)
+                .forEach(
+                        idx ->
+                                builder.addField(
+                                        idx,
+                                        
sortSpec.getFieldSpec(idx).getIsAscendingOrder(),
+                                        
sortSpec.getFieldSpec(idx).getNullIsLast()));
+        SortSpec sortSpecInSortKey = builder.build();

Review comment:
       add a `createSubSortSpec(int startIndex, int endIndex)` method in 
`SortSpec` and replace this code snippet with the new method ? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
##########
@@ -36,25 +37,27 @@ object ComparatorCodeGenerator {
     * @param conf        Table config.
     * @param name        Class name of the function.
     *                    Does not need to be unique but has to be a valid Java 
class identifier.
-    * @param keys        key positions describe which fields are keys in what 
order.
-    * @param keyTypes    types for the key fields, in the same order as the 
key fields.
-    * @param orders      sorting orders for the key fields.
-    * @param nullsIsLast Ordering of nulls.
+    * @param input        input type.
+    * @param sortSpec     sort specification.
     * @return A GeneratedRecordComparator
     */
   def gen(
       conf: TableConfig,
       name: String,
-      keys: Array[Int],
-      keyTypes: Array[LogicalType],
-      orders: Array[Boolean],
-      nullsIsLast: Array[Boolean]): GeneratedRecordComparator = {
+      input: RowType,
+      sortSpec: SortSpec): GeneratedRecordComparator = {
     val className = newName(name)
     val baseClass = classOf[RecordComparator]
 
     val ctx = new CodeGeneratorContext(conf)
     val compareCode = GenerateUtils.generateRowCompare(
-      ctx, keys, keyTypes, orders, nullsIsLast, "o1", "o2")
+        ctx,
+        sortSpec.getFieldIndices,
+        sortSpec.getFieldTypes(input),
+        sortSpec.getAscendingOrders,
+        sortSpec.getNullsIsLast,

Review comment:
       replace the sort related parameters with `SortSpec` for 
`GenerateUtils.generateRowCompare` method ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to