This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc62f22ecb7fb2830cff211edccaffba09600f6d
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Tue Oct 2 12:03:07 2018 +0200

    [FLINK-7811] Add DataStream.keyBy() that takes KeySelector
    
    Previously we only allowed a lambda here, which was an omission.
    
    This also adds support for KeySelector on other operations that need a
    key.
---
 .../api/scala/unfinishedKeyPairOperation.scala     | 32 ++++++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala     | 12 ++++++++
 2 files changed, 44 insertions(+)

diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
index 4620075..da93ca9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -90,6 +90,19 @@ private[flink] abstract class UnfinishedKeyPairOperation[L, 
R, O](
     val leftKey = new Keys.SelectorFunctionKeys[L, K](keyExtractor, 
leftInput.getType, keyType)
     new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey)
   }
+
+  /**
+   * Specify the key selector function for the left side of the key based 
operation. This returns
+   * a [[HalfUnfinishedKeyPairOperation]] on which `equalTo` must be called to 
specify the
+   * key for the right side. The result after specifying the right side key is 
the finished
+   * operation.
+   */
+  def where[K: TypeInformation](fun: KeySelector[L, K]) = {
+    val keyType = implicitly[TypeInformation[K]]
+    val leftKey =
+      new Keys.SelectorFunctionKeys[L, K](leftInput.clean(fun), 
leftInput.getType, keyType)
+    new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey)
+  }
 }
 
 @Internal
@@ -140,6 +153,25 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, 
O](
       keyExtractor,
       unfinished.rightInput.getType,
       keyType)
+
+    if (!leftKey.areCompatible(rightKey)) {
+      throw new InvalidProgramException("The types of the key fields do not 
match. Left: " +
+        leftKey + " Right: " + rightKey)
+    }
+    unfinished.finish(leftKey, rightKey)
+  }
+
+  /**
+   * Specify the key selector function for the right side of the key based 
operation. This returns
+   * the finished operation.
+   */
+  def equalTo[K: TypeInformation](fun: KeySelector[R, K]): O = {
+
+    val keyType = implicitly[TypeInformation[K]]
+    val rightKey = new Keys.SelectorFunctionKeys[R, K](
+      unfinished.leftInput.clean(fun),
+      unfinished.rightInput.getType,
+      keyType)
     
     if (!leftKey.areCompatible(rightKey)) {
       throw new InvalidProgramException("The types of the key fields do not 
match. Left: " +
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 247f54c..23d2165 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -415,6 +415,18 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped 
aggregations.
+   */
+  def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K] = {
+
+    val cleanFun = clean(fun)
+    val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
+
+    asScalaStream(new JavaKeyedStream(stream, cleanFun, keyType))
+  }
+
+  /**
    * Partitions a tuple DataStream on the specified key fields using a custom 
partitioner.
    * This method takes the key position to partition on, and a partitioner 
that accepts the key
    * type.

Reply via email to