Repository: flink Updated Branches: refs/heads/master f2186a604 -> 5c2c112b2
[hotfix] Add ResultTypeQueryable to Keys in Stream CoGroup/Join Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2c112b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2c112b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2c112b Branch: refs/heads/master Commit: 5c2c112b2b2d3a8f14d7cd82da940d11feb8e097 Parents: f2186a6 Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 7 22:59:08 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:59:56 2015 +0200 ---------------------------------------------------------------------- .../streaming/api/scala/CoGroupedStreams.scala | 33 +++++++++++++------- .../streaming/api/scala/JoinedStreams.scala | 32 ++++++++++++------- 2 files changed, 42 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala index 1b16e44..0164b92 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala._ +import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams} import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.evictors.Evictor @@ -69,23 +69,27 @@ object CoGroupedStreams { /** * Specifies a [[KeySelector]] for elements from the first input. */ - def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T1, KEY] { + val keyType = implicitly[TypeInformation[KEY]] + val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T1) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = keyType } - new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null) + new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType) } /** * Specifies a [[KeySelector]] for elements from the second input. */ - def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T2, KEY] { + val keyType = implicitly[TypeInformation[KEY]] + val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T2) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = keyType } - new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector) + new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType) } /** @@ -112,17 +116,20 @@ object CoGroupedStreams { input1: DataStream[T1], input2: DataStream[T2], keySelector1: KeySelector[T1, KEY], - keySelector2: KeySelector[T2, KEY]) { + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY]) { /** * Specifies a [[KeySelector]] for elements from the first input. */ def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T1, KEY] { + val localKeyType = keyType + val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T1) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = localKeyType } - new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2) + new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType) } /** @@ -130,10 +137,12 @@ object CoGroupedStreams { */ def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T2, KEY] { + val localKeyType = keyType + val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T2) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = localKeyType } - new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector) + new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala index be059b8..2fda32d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams} import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.evictors.Evictor @@ -66,23 +67,27 @@ object JoinedStreams { /** * Specifies a [[KeySelector]] for elements from the first input. */ - def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T1, KEY] { + val keyType = implicitly[TypeInformation[KEY]] + val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T1) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = keyType } - new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null) + new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType) } /** * Specifies a [[KeySelector]] for elements from the second input. */ - def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T2, KEY] { + val keyType = implicitly[TypeInformation[KEY]] + val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T2) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = keyType } - new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector) + new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType) } /** @@ -109,17 +114,20 @@ object JoinedStreams { input1: DataStream[T1], input2: DataStream[T2], keySelector1: KeySelector[T1, KEY], - keySelector2: KeySelector[T2, KEY]) { + keySelector2: KeySelector[T2, KEY], + keyType: TypeInformation[KEY]) { /** * Specifies a [[KeySelector]] for elements from the first input. */ def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T1, KEY] { + val localKeyType = keyType + val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T1) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = localKeyType } - new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2) + new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, localKeyType) } /** @@ -127,10 +135,12 @@ object JoinedStreams { */ def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { val cleanFun = clean(keySelector) - val javaSelector = new KeySelector[T2, KEY] { + val localKeyType = keyType + val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { def getKey(in: T2) = cleanFun(in) + override def getProducedType: TypeInformation[KEY] = localKeyType } - new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector) + new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType) } /**
