[
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261094#comment-15261094
]
ASF GitHub Bot commented on FLINK-2946:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1926#discussion_r61348811
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inp: RelNode,
+ collations: RelCollation,
+ rowType2: RelDataType)
+ extends SingleRel(cluster, traitSet, inp)
+ with DataSetRel{
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode ={
+ new DataSetSort(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ collations,
+ rowType2
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None):
DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+ var partitionedDs = if (currentParallelism == 1) {
+ inputDS
+ } else {
+ inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+ .withOrders(fieldCollations.map(_._2): _*)
+ }
+
+ fieldCollations.foreach { fieldCollation =>
+ partitionedDs = partitionedDs.sortPartition(fieldCollation._1,
fieldCollation._2)
+ }
+
+ val inputType = partitionedDs.getType
+ expectedType match {
+
+ case None if config.getEfficientTypeUsage =>
+ partitionedDs
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != inputType) {
+
+ val mapFunc = getConversionMapper(config,
+ partitionedDs.getType,
+ determinedType,
+ "DataSetSortConversion",
+ getRowType.getFieldNames.asScala
+ )
+
+ partitionedDs.map(mapFunc)
+ }
+ // no conversion necessary, forward
+ else {
+ partitionedDs
+ }
+ }
+ }
+
+ private def directionToOrder(direction: Direction) = {
+ direction match {
+ case Direction.ASCENDING | Direction.STRICTLY_ASCENDING =>
Order.ASCENDING
+ case Direction.DESCENDING | Direction.STRICTLY_DESCENDING =>
Order.DESCENDING
+ }
+
+ }
+
+ private val fieldCollations = collations.getFieldCollations.asScala
+ .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+ private val sortFieldsToString = fieldCollations
+ .map(col => s"${rowType2.getFieldNames.get(col._1)}
${col._2.getShortName}" ).mkString(", ")
+
+ override def toString: String = s"Sort(by: $sortFieldsToString)"
+
+ override def explainTerms(pw: RelWriter) : RelWriter = {
+ super.explainTerms(pw)
+ .item("by", sortFieldsToString)
--- End diff --
"by" -> "orderBy"
> Add orderBy() to Table API
> --------------------------
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Reporter: Timo Walther
> Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting
> feature are very welcome. Is there any more efficient way instead of
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the
> nodes first and finally sort on one node afterwards?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)