[
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330530#comment-15330530
]
ASF GitHub Bot commented on FLINK-3650:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1856#discussion_r67050779
--- Diff:
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
def output(outputFormat: OutputFormat[T]): DataSink[T] = {
javaSet.output(outputFormat)
}
-
+
+ /**
+ * Selects an element with minimum value.
+ * <p>
+ * The minimum is computed over the specified fields in lexicographical
order.
+ * <p>
+ * <strong>Example 1</strong>: Given a data set with elements <code>[0,
1], [1, 0]</code>, the
+ * results will be:
+ * <ul>
+ * <li><code>minBy(0)</code>: <code>[0, 1]</code></li>
+ * <li><code>minBy(1)</code>: <code>[1, 0]</code></li>
+ * </ul>
+ * <p>
+ * <strong>Example 2</strong>: Given a data set with elements <code>[0,
0], [0, 1]</code>, the
+ * results will be:
+ * <ul>
+ * <li><code>minBy(0, 1)</code>: <code>[0, 0]</code></li>
+ * </ul>
+ * <p>
+ * If multiple values with minimum value at the specified fields exist,
a random one will be
+ * picked.
+ * <p>
+ * Internally, this operation is implemented as a {@link
ReduceFunction}.
+ *
+ */
+ def minBy(fields: Array[Int]) : ReduceOperator[T] = {
+ if (!getType.isTupleType) {
+ throw new InvalidProgramException("DataSet#minBy(int...) only works
on Tuple types.")
+ }
+
+ return new ReduceOperator[T](
+ javaSet,
+ new SelectByMinFunction[T](getType(), fields),
+ getCallLocationName())
+ }
+
+ /**
+ * Selects an element with maximum value.
+ * <p>
+ * The maximum is computed over the specified fields in lexicographical
order.
+ * <p>
+ * <strong>Example 1</strong>: Given a data set with elements <code>[0,
1], [1, 0]</code>, the
+ * results will be:
+ * <ul>
+ * <li><code>maxBy(0)</code>: <code>[1, 0]</code></li>
+ * <li><code>maxBy(1)</code>: <code>[0, 1]</code></li>
+ * </ul>
+ * <p>
+ * <strong>Example 2</strong>: Given a data set with elements <code>[0,
0], [0, 1]</code>, the
+ * results will be:
+ * <ul>
+ * <li><code>maxBy(0, 1)</code>: <code>[0, 1]</code></li>
+ * </ul>
+ * <p>
+ * If multiple values with maximum value at the specified fields exist,
a random one will be
+ * picked.
+ * <p>
+ * Internally, this operation is implemented as a {@link
ReduceFunction}.
+ *
+ */
+ def maxBy(fields: Array[Int]) : ReduceOperator[T] = {
--- End diff --
Comments on `minBy()` apply here as well.
Also please move these methods up between `combineGroup` and `first`.
> Add maxBy/minBy to Scala DataSet API
> ------------------------------------
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
> Issue Type: Improvement
> Components: Java API, Scala API
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}.
> These methods are not supported by the Scala DataSet API. These methods
> should be added in order to have a consistent API.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)