Repository: flink
Updated Branches:
  refs/heads/master f2186a604 -> 5c2c112b2


[hotfix] Add ResultTypeQueryable to Keys in Stream CoGroup/Join


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2c112b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2c112b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2c112b

Branch: refs/heads/master
Commit: 5c2c112b2b2d3a8f14d7cd82da940d11feb8e097
Parents: f2186a6
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Oct 7 22:59:08 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:59:56 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/scala/CoGroupedStreams.scala  | 33 +++++++++++++-------
 .../streaming/api/scala/JoinedStreams.scala     | 32 ++++++++++++-------
 2 files changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 1b16e44..0164b92 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala._
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => 
JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -69,23 +69,27 @@ object CoGroupedStreams {
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
-    def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, 
KEY] = {
+    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, 
KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, 
null)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
     }
 
     /**
      * Specifies a [[KeySelector]] for elements from the second input.
      */
-    def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, 
KEY] = {
+    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, 
KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, 
javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
     }
 
     /**
@@ -112,17 +116,20 @@ object CoGroupedStreams {
       input1: DataStream[T1],
       input2: DataStream[T2],
       keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY]) {
+      keySelector2: KeySelector[T2, KEY],
+      keyType: TypeInformation[KEY]) {
 
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
     def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = 
{
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, 
keySelector2)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, 
keyType)
     }
 
     /**
@@ -130,10 +137,12 @@ object CoGroupedStreams {
      */
     def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] 
= {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, 
javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, 
keyType)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index be059b8..2fda32d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{JoinedStreams => 
JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -66,23 +67,27 @@ object JoinedStreams {
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
-    def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] 
= {
+    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, 
KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, 
null)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
     }
 
     /**
      * Specifies a [[KeySelector]] for elements from the second input.
      */
-    def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, 
KEY] = {
+    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, 
KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, 
javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
     }
 
     /**
@@ -109,17 +114,20 @@ object JoinedStreams {
       input1: DataStream[T1],
       input2: DataStream[T2],
       keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY]) {
+      keySelector2: KeySelector[T2, KEY],
+      keyType: TypeInformation[KEY]) {
 
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
     def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, 
keySelector2)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, 
localKeyType)
     }
 
     /**
@@ -127,10 +135,12 @@ object JoinedStreams {
      */
     def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, 
javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, 
localKeyType)
     }
 
     /**

Reply via email to