This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 094cd53  [FLINK-18629] Add type to ConnectedStreams#keyBy
094cd53 is described below

commit 094cd530c052c2f0776883521e71c53748e7eb2c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Jul 21 11:03:46 2020 +0200

    [FLINK-18629] Add type to ConnectedStreams#keyBy
    
    Adding a generic type to the method makes it possible to pass the type
    from a lambda function. Otherwise a wildcard type '?' is derived as
    Object and thus TypeExtractor extract a GenericTypeInfo<Object> for the
    key.
---
 .../streaming/api/datastream/ConnectedStreams.java |  4 +++-
 .../streaming/api/datastream/IterativeStream.java  |  2 +-
 .../apache/flink/streaming/api/DataStreamTest.java | 19 ++++++++++++++++
 flink-streaming-scala/pom.xml                      |  1 +
 .../streaming/api/scala/ConnectedStreams.scala     | 26 ++++++++--------------
 .../streaming/api/scala/DataStreamUtils.scala      | 12 +++++-----
 .../acceptPartialFunctions/OnConnectedStream.scala |  2 +-
 7 files changed, 41 insertions(+), 25 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 6060a29..5a796f8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -188,7 +188,9 @@ public class ConnectedStreams<IN1, IN2> {
         *            The {@link KeySelector} used for grouping the second input
         * @return The partitioned {@link ConnectedStreams}
         */
-       public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> 
keySelector1, KeySelector<IN2, ?> keySelector2) {
+       public <KEY> ConnectedStreams<IN1, IN2> keyBy(
+                       KeySelector<IN1, KEY> keySelector1,
+                       KeySelector<IN2, KEY> keySelector2) {
                return new ConnectedStreams<>(environment, 
inputStream1.keyBy(keySelector1),
                                inputStream2.keyBy(keySelector2));
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 48bccfe..430241a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -202,7 +202,7 @@ public class IterativeStream<T> extends 
SingleOutputStreamOperator<T> {
                }
 
                @Override
-               public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> 
keySelector1, KeySelector<F, ?> keySelector2) {
+               public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> 
keySelector1, KeySelector<F, KEY> keySelector2) {
                        throw groupingException;
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 788385b..dcd6f5f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -96,9 +97,11 @@ import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1027,6 +1030,22 @@ public class DataStreamTest extends TestLogger {
        }
 
        @Test
+       public void testKeyedConnectedStreamsType() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Integer> stream1 = env.fromElements(1, 2);
+               DataStreamSource<Integer> stream2 = env.fromElements(1, 2);
+
+               ConnectedStreams<Integer, Integer> connectedStreams = 
stream1.connect(stream2)
+                       .keyBy(v -> v, v -> v);
+
+               KeyedStream<?, ?> firstKeyedInput = (KeyedStream<?, ?>) 
connectedStreams.getFirstInput();
+               KeyedStream<?, ?> secondKeyedInput = (KeyedStream<?, ?>) 
connectedStreams.getSecondInput();
+               assertThat(firstKeyedInput.getKeyType(), equalTo(Types.INT));
+               assertThat(secondKeyedInput.getKeyType(), equalTo(Types.INT));
+       }
+
+       @Test
        public void sinkKeyTest() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index fadb3fe..2554969 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -261,6 +261,7 @@ under the License.
                                                        Can be removed once 
https://github.com/siom79/japicmp/issues/176 will be fixed -->
                                                        
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
                                                        
<exclude>org.apache.flink.streaming.api.scala.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.ConnectedStreams#keyBy(scala.Function1,scala.Function1,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
                                                </excludes>
                                        </parameter>
                                </configuration>
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index caaf22d..3f0e141 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => 
JavaCStream, DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co._
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
@@ -300,19 +299,18 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
    * @param fun2 The second stream's key function
    * @return The key-grouped connected streams
    */
-  def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: 
IN2 => K2):
+  def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY):
       ConnectedStreams[IN1, IN2] = {
 
-    val keyType1 = implicitly[TypeInformation[K1]]
-    val keyType2 = implicitly[TypeInformation[K2]]
-    
+    val keyType = implicitly[TypeInformation[KEY]]
+
     val cleanFun1 = clean(fun1)
     val cleanFun2 = clean(fun2)
-    
-    val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
-    val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
-    
-    asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2))
+
+    val keyExtractor1 = new JavaKeySelector[IN1, KEY](cleanFun1)
+    val keyExtractor2 = new JavaKeySelector[IN2, KEY](cleanFun2)
+
+    asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2, keyType))
   }
 
   /**
@@ -332,12 +330,6 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
 }
 
 @Internal
-class KeySelectorWithType[IN, K](
-        private[this] val fun: IN => K,
-        private[this] val info: TypeInformation[K])
-  extends KeySelector[IN, K] with ResultTypeQueryable[K] {
-  
+class JavaKeySelector[IN, K](private[this] val fun: IN => K) extends 
KeySelector[IN, K] {
   override def getKey(value: IN): K = fun(value)
-
-  override def getProducedType: TypeInformation[K] = info
 }
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
index fd48f9c..079701d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.datastream.{DataStreamUtils => 
JavaStreamUtils}
 
 import scala.collection.JavaConverters._
@@ -46,11 +47,11 @@ class DataStreamUtils[T: TypeInformation : ClassTag](val 
self: DataStream[T]) {
 
   /**
     * Reinterprets the given [[DataStream]] as a [[KeyedStream]], which 
extracts keys with the
-    * given [[KeySelectorWithType]].
+    * given [[KeySelector]].
     *
     * IMPORTANT: For every partition of the base stream, the keys of events in 
the base stream
     * must be partitioned exactly in the same way as if it was created through 
a
-    * [[DataStream#keyBy(KeySelectorWithType)]].
+    * [[DataStream#keyBy(KeySelector)]].
     *
     * @param keySelector Function that defines how keys are extracted from the 
data stream.
     * @return The reinterpretation of the [[DataStream]] as a [[KeyedStream]].
@@ -58,11 +59,12 @@ class DataStreamUtils[T: TypeInformation : ClassTag](val 
self: DataStream[T]) {
   def reinterpretAsKeyedStream[K: TypeInformation](
         keySelector: T => K): KeyedStream[T, K] = {
 
-    val keySelectorWithType =
-      new KeySelectorWithType[T, K](clean(keySelector), 
implicitly[TypeInformation[K]])
+    val keyTypeInfo = implicitly[TypeInformation[K]]
+    val cleanSelector = clean(keySelector)
+    val javaKeySelector = new JavaKeySelector[T, K](cleanSelector)
 
     asScalaStream(
-      JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream, 
keySelectorWithType))
+      JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream, 
javaKeySelector, keyTypeInfo))
   }
 
   private[flink] def clean[F <: AnyRef](f: F): F = {
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
index deb03a3..bc6733c 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
@@ -72,7 +72,7 @@ class OnConnectedStream[IN1, IN2](stream: 
ConnectedStreams[IN1, IN2]) {
     * @return The key-grouped connected streams
     */
   @PublicEvolving
-  def keyingBy[K1: TypeInformation, K2: TypeInformation](key1: IN1 => K1, 
key2: IN2 => K2):
+  def keyingBy[KEY: TypeInformation](key1: IN1 => KEY, key2: IN2 => KEY):
       ConnectedStreams[IN1, IN2] =
     stream.keyBy(key1, key2)
 

Reply via email to