Repository: flink
Updated Branches:
  refs/heads/master b63e19b09 -> efc344a4e


[FLINK-3650] [dataSet] Add maxBy/minBy to Scala DataSet API

This closes #1856


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cc69434
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cc69434
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cc69434

Branch: refs/heads/master
Commit: 7cc69434a21e23c871d091b0ba8308567a893cde
Parents: b63e19b
Author: Vasudevan <ramkrishna.s.vasude...@intel.com>
Authored: Wed Apr 6 11:43:07 2016 +0530
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Jun 17 00:13:34 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala    |  65 +++++-
 .../apache/flink/api/scala/GroupedDataSet.scala |  29 +++
 .../flink/api/scala/SelectByMaxFunction.scala   |  59 +++++
 .../flink/api/scala/SelectByMinFunction.scala   |  60 +++++
 .../flink/api/operator/MaxByOperatorTest.scala  | 169 ++++++++++++++
 .../flink/api/operator/MinByOperatorTest.scala  | 173 +++++++++++++++
 .../api/operator/SelectByFunctionTest.scala     | 219 +++++++++++++++++++
 7 files changed, 770 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5735b32..4e7be04 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Public, PublicEvolving}
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
@@ -35,7 +35,8 @@ import org.apache.flink.api.java.io.{PrintingOutputFormat, 
TextOutputFormat}
 import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.java.{Utils, DataSet => JavaDataSet}
 import org.apache.flink.api.scala.operators.{ScalaAggregateOperator, 
ScalaCsvOutputFormat}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.{FileSystem, Path}
@@ -262,7 +263,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       case udfOp: UdfOperator[_] => udfOp.withParameters(parameters)
       case source: DataSource[_] => source.withParameters(parameters)
       case _ =>
-        throw new UnsupportedOperationException("Operator " + javaSet.toString 
+        throw new UnsupportedOperationException("Operator " + javaSet.toString
             + " cannot have parameters")
     }
     this
@@ -699,6 +700,62 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+    * Selects an element with minimum value.
+    *
+    * The minimum is computed over the specified fields in lexicographical 
order.
+    *
+    * Example 1: Given a data set with elements [0, 1], [1, 0], the
+    * results will be:
+    * {{{
+    *   minBy(0)[0, 1]
+    *   minBy(1)[1, 0]
+    * }}}
+    * Example 2: Given a data set with elements [0, 0], [0, 1], the
+    * results will be:
+    * {{{
+    *   minBy(0, 1)[0, 0]
+    * }}}
+    * If multiple values with minimum value at the specified fields exist, a 
random one will be
+    * picked.
+    * Internally, this operation is implemented as a [[ReduceFunction]]
+    */
+  def minBy(fields: Int*) : DataSet[T]  = {
+    if (!getType.isTupleType) {
+      throw new InvalidProgramException("DataSet#minBy(int...) only works on 
Tuple types.")
+    }
+
+    reduce(new 
SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
+    * Selects an element with maximum value.
+    *
+    * The maximum is computed over the specified fields in lexicographical 
order.
+    *
+    * Example 1: Given a data set with elements [0, 1], [1, 0], the
+    * results will be:
+    * {{{
+    *   maxBy(0)[1, 0]
+    *   maxBy(1)[0, 1]
+    * }}}
+    * Example 2: Given a data set with elements [0, 0], [0, 1], the
+    * results will be:
+    * {{{
+    *   maxBy(0, 1)[0, 1]
+    * }}}
+    * If multiple values with maximum value at the specified fields exist, a 
random one will be
+    * picked
+    * Internally, this operation is implemented as a [[ReduceFunction]].
+    *
+    */
+  def maxBy(fields: Int*) : DataSet[T] = {
+    if (!getType.isTupleType) {
+      throw new InvalidProgramException("DataSet#maxBy(int...) only works on 
Tuple types.")
+    }
+    reduce(new 
SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
    * Creates a new DataSet containing the first `n` elements of this DataSet.
    */
   def first(n: Int): DataSet[T] = {
@@ -1599,7 +1656,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
     javaSet.output(outputFormat)
   }
-  
+
   /**
    * Prints the elements in a DataSet to the standard output stream 
[[System.out]] of the
    * JVM that calls the print() method. For programs that are executed in a 
cluster, this

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index 8af0a4e..18dea07 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala.operators.ScalaAggregateOperator
 import org.apache.flink.util.Collector
 
@@ -356,6 +357,34 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+    * Applies a special case of a reduce transformation `maxBy` on a grouped 
[[DataSet]]
+    * The transformation consecutively calls a [[ReduceFunction]]
+    * until only a single element remains which is the result of the 
transformation.
+    * A ReduceFunction combines two elements into one new element of the same 
type.
+    */
+  def maxBy(fields: Int*) : DataSet[T]  = {
+    if (!set.getType().isTupleType) {
+      throw new InvalidProgramException("GroupedDataSet#maxBy(int...) only 
works on Tuple types.")
+    }
+    reduce(new 
SelectByMaxFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]],
+      fields.toArray))
+  }
+
+  /**
+    * Applies a special case of a reduce transformation `minBy` on a grouped 
[[DataSet]].
+    * The transformation consecutively calls a [[ReduceFunction]]
+    * until only a single element remains which is the result of the 
transformation.
+    * A ReduceFunction combines two elements into one new element of the same 
type.
+    */
+  def minBy(fields: Int*) : DataSet[T]  = {
+    if (!set.getType().isTupleType) {
+      throw new InvalidProgramException("GroupedDataSet#minBy(int...) only 
works on Tuple types.")
+    }
+    reduce(new 
SelectByMinFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]],
+      fields.toArray))
+  }
+
+  /**
    *  Applies a CombineFunction on a grouped [[DataSet]].  A
    *  CombineFunction is similar to a GroupReduceFunction but does not
    *  perform a full data exchange. Instead, the CombineFunction calls

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala
new file mode 100644
index 0000000..9cc5451
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int])
+  extends ReduceFunction[T] {
+  for(f <- fields) {
+    if (f < 0 || f >= t.getArity()) {
+      throw new IndexOutOfBoundsException(
+        "SelectByMaxFunction field position " + f + " is out of range.")
+    }
+
+    // Check whether type is comparable
+    if (!t.getTypeAt(f).isKeyType()) {
+      throw new IllegalArgumentException(
+        "SelectByMaxFunction supports only key(Comparable) types.")
+    }
+  }
+
+  override def reduce(value1: T, value2: T): T = {
+    for (f <- fields) {
+        val element1  = 
value1.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]]
+        val element2 = 
value2.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]]
+
+        val comp = element1.compareTo(element2)
+        // If comp is bigger than 0 comparable 1 is bigger.
+        // Return the smaller value.
+        if (comp > 0) {
+          return value1
+        } else if (comp < 0) {
+          return value2
+        }
+      }
+      value1
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala
new file mode 100644
index 0000000..71cdb84
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int])
+  extends ReduceFunction[T] {
+  for(f <- fields) {
+    if (f < 0 || f >= t.getArity()) {
+      throw new IndexOutOfBoundsException(
+        "SelectByMinFunction field position " + f + " is out of range.")
+    }
+
+    // Check whether type is comparable
+    if (!t.getTypeAt(f).isKeyType()) {
+      throw new IllegalArgumentException(
+        "SelectByMinFunction supports only key(Comparable) types.")
+    }
+  }
+
+  override def reduce(value1: T, value2: T): T = {
+    for (f <- fields) {
+        val element1  = 
value1.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]]
+        val element2 = 
value2.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]]
+
+        val comp = element1.compareTo(element2)
+
+        // If comp is bigger than 0 comparable 1 is bigger.
+        // Return the smaller value.
+        if (comp < 0) {
+          return value1
+        } else if (comp > 0) {
+          return value2
+        }
+    }
+    value1
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
new file mode 100644
index 0000000..523cd5d
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+
+  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
+  private val customTypeData = List[CustomType]()
+
+  @Test
+  def testMaxByKeyFieldsDataset(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+    try {
+      collection.maxBy(0, 1, 2, 3, 4)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(1, 2, 3, 4, -1)
+  }
+
+  /**
+    * This test validates that no exceptions is thrown when an empty grouping
+    * calls maxBy().
+    */
+  @Test
+  def testMaxByKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    // should work
+    try {
+      groupDs.maxBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when maxBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val customDS = env.fromCollection(customTypeData)
+    // should not work: groups on custom type
+    customDS.maxBy(0)
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when maxBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
+
+    groupDs.maxBy(0)
+  }
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping2() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping3() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(1, 2, 3, 4, -1)
+  }
+
+  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
+    def this() {
+      this(0, 0, "")
+    }
+
+    override def toString: String = {
+      myInt + "," + myLong + "," + myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
new file mode 100644
index 0000000..f9d5249
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
+  private val customTypeData = List[CustomType]()
+
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+    try {
+      collection.minBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(1, 2, 3, 4, -1)
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when minBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val customDS = env.fromCollection(customTypeData)
+    // should not work: groups on custom type
+    customDS.minBy(0)
+  }
+
+  /**
+    * This test validates that no exceptions is thrown when an empty grouping
+    * calls minBy().
+    */
+  @Test
+  def testMinByKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    // should work
+    try {
+      groupDs.minBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail()
+    }
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when minBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
+
+    groupDs.minBy(0)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping2() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(-1)
+  }
+
+  /**s
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping3() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(1, 2, 3, 4, -1)
+  }
+
+  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
+    def this() {
+      this(0, 0, "")
+    }
+
+    override def toString: String = {
+      myInt + "," + myLong + "," + myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
new file mode 100644
index 0000000..291df79
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.{SelectByMaxFunction, SelectByMinFunction}
+import org.apache.flink.api.scala._
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+   val tupleTypeInfo = implicitly[TypeInformation[(Int, Long, String, Long, 
Int)]]
+    .asInstanceOf[TupleTypeInfoBase[(Int, Long, String, Long, Int)]]
+
+   private val bigger  = (10, 100L, "HelloWorld", 200L, 20)
+   private val smaller = (5, 50L, "Hello", 50L, 15)
+
+   //Special case where only the last value determines if bigger or smaller
+   private val specialCaseBigger = (10, 100L, "HelloWorld", 200L, 17)
+   private val specialCaseSmaller  = (5, 50L, "Hello", 50L, 17)
+
+  /**
+    * This test validates whether the order of tuples has
+    *
+    * any impact on the outcome and if the bigger tuple is returned.
+    */
+  @Test
+  def testMaxByComparison(): Unit = {
+    val a1 = Array(0)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+      try {
+        Assert.assertSame("SelectByMax must return bigger tuple",
+          bigger, maxByTuple.reduce(smaller, bigger))
+        Assert.assertSame("SelectByMax must return bigger tuple",
+          bigger, maxByTuple.reduce(bigger, smaller))
+      } catch {
+        case e : Exception =>
+          Assert.fail("No exception should be thrown while comapring both 
tuples")
+      }
+  }
+
+  // ----------------------- MAXIMUM FUNCTION TEST BELOW 
--------------------------
+
+  /**
+    * This test cases checks when two tuples only differ in one value, but 
this value is not
+    * in the fields list. In that case it should be seen as equal
+    * and then the first given tuple (value1) should be returned by reduce().
+    */
+  @Test
+  def testMaxByComparisonSpecialCase1() : Unit = {
+    val a1 = Array(0, 3)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMax must return the first given tuple",
+        specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMax must return the first given tuple",
+        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comapring both tuples")
+    }
+  }
+
+  /**
+    * This test cases checks when two tuples only differ in one value.
+    */
+  @Test
+  def testMaxByComparisonSpecialCase2() : Unit = {
+    val a1 = Array(0, 2, 1, 4, 3)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comapring both tuples")
+    }
+  }
+
+  /**
+    * This test validates that equality is independent of the amount of used 
indices.
+    */
+  @Test
+  def testMaxByComparisonMultiple(): Unit = {
+    val a1 = Array(0, 1, 2, 3, 4)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comapring both tuples")
+    }
+  }
+
+  /**
+    * Checks whether reduce does behave as expected if both values are the 
same object.
+    */
+  @Test
+  def testMaxByComparisonMustReturnATuple() : Unit = {
+    val a1 = Array(0)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, bigger))
+      Assert.assertSame("SelectByMax must return smaller tuple",
+        smaller, maxByTuple.reduce(smaller, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comapring both tuples")
+    }
+  }
+
+  // ----------------------- MINIMUM FUNCTION TEST BELOW 
--------------------------
+
+  /**
+    * This test validates whether the order of tuples has any impact
+    * on the outcome and if the smaller tuple is returned.
+    */
+  @Test
+  def testMinByComparison() : Unit = {
+    val a1 = Array(0)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comapring both tuples")
+    }
+  }
+
+  /**
+    * This test cases checks when two tuples only differ in one value, but 
this value is not
+    * in the fields list. In that case it should be seen as equal and
+    * then the first given tuple (value1) should be returned by reduce().
+    */
+  @Test
+  def testMinByComparisonSpecialCase1() : Unit = {
+    val a1 = Array(0, 3)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMin must return the first given tuple",
+        specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMin must return the first given tuple",
+        bigger, minByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comapring both tuples")
+    }
+  }
+
+  /**
+    * This test validates that when two tuples only differ in one value
+    * and that value's index is given at construction time. The smaller tuple 
must be returned
+    * then.
+    */
+  @Test
+  def  testMinByComparisonSpecialCase2() : Unit = {
+    val a1 = Array(0, 2, 1, 4, 3)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(specialCaseSmaller, smaller))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, specialCaseSmaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comapring both tuples")
+    }
+  }
+
+  /**
+    * Checks whether reduce does behave as expected if both values are the 
same object.
+    */
+  @Test
+  def testMinByComparisonMultiple() : Unit =  {
+    val a1 = Array(0, 1, 2, 3, 4)
+    val minByTuple  = new SelectByMinFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comapring both tuples")
+    }
+  }
+}

Reply via email to