This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc62f22ecb7fb2830cff211edccaffba09600f6d Author: Aljoscha Krettek <[email protected]> AuthorDate: Tue Oct 2 12:03:07 2018 +0200 [FLINK-7811] Add DataStream.keyBy() that takes KeySelector Previously we only allowed a lambda here, which was an omission. This also adds support for KeySelector on other operations that need a key. --- .../api/scala/unfinishedKeyPairOperation.scala | 32 ++++++++++++++++++++++ .../flink/streaming/api/scala/DataStream.scala | 12 ++++++++ 2 files changed, 44 insertions(+) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala index 4620075..da93ca9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala @@ -90,6 +90,19 @@ private[flink] abstract class UnfinishedKeyPairOperation[L, R, O]( val leftKey = new Keys.SelectorFunctionKeys[L, K](keyExtractor, leftInput.getType, keyType) new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey) } + + /** + * Specify the key selector function for the left side of the key based operation. This returns + * a [[HalfUnfinishedKeyPairOperation]] on which `equalTo` must be called to specify the + * key for the right side. The result after specifying the right side key is the finished + * operation. + */ + def where[K: TypeInformation](fun: KeySelector[L, K]) = { + val keyType = implicitly[TypeInformation[K]] + val leftKey = + new Keys.SelectorFunctionKeys[L, K](leftInput.clean(fun), leftInput.getType, keyType) + new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey) + } } @Internal @@ -140,6 +153,25 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, O]( keyExtractor, unfinished.rightInput.getType, keyType) + + if (!leftKey.areCompatible(rightKey)) { + throw new InvalidProgramException("The types of the key fields do not match. Left: " + + leftKey + " Right: " + rightKey) + } + unfinished.finish(leftKey, rightKey) + } + + /** + * Specify the key selector function for the right side of the key based operation. This returns + * the finished operation. + */ + def equalTo[K: TypeInformation](fun: KeySelector[R, K]): O = { + + val keyType = implicitly[TypeInformation[K]] + val rightKey = new Keys.SelectorFunctionKeys[R, K]( + unfinished.leftInput.clean(fun), + unfinished.rightInput.getType, + keyType) if (!leftKey.areCompatible(rightKey)) { throw new InvalidProgramException("The types of the key fields do not match. Left: " + diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 247f54c..23d2165 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -415,6 +415,18 @@ class DataStream[T](stream: JavaStream[T]) { } /** + * Groups the elements of a DataStream by the given K key to + * be used with grouped operators like grouped reduce or grouped aggregations. + */ + def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K] = { + + val cleanFun = clean(fun) + val keyType: TypeInformation[K] = implicitly[TypeInformation[K]] + + asScalaStream(new JavaKeyedStream(stream, cleanFun, keyType)) + } + + /** * Partitions a tuple DataStream on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key * type.
