This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 094cd53 [FLINK-18629] Add type to ConnectedStreams#keyBy
094cd53 is described below
commit 094cd530c052c2f0776883521e71c53748e7eb2c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Jul 21 11:03:46 2020 +0200
[FLINK-18629] Add type to ConnectedStreams#keyBy
Adding a generic type to the method makes it possible to pass the type
from a lambda function. Otherwise a wildcard type '?' is derived as
Object and thus TypeExtractor extract a GenericTypeInfo<Object> for the
key.
---
.../streaming/api/datastream/ConnectedStreams.java | 4 +++-
.../streaming/api/datastream/IterativeStream.java | 2 +-
.../apache/flink/streaming/api/DataStreamTest.java | 19 ++++++++++++++++
flink-streaming-scala/pom.xml | 1 +
.../streaming/api/scala/ConnectedStreams.scala | 26 ++++++++--------------
.../streaming/api/scala/DataStreamUtils.scala | 12 +++++-----
.../acceptPartialFunctions/OnConnectedStream.scala | 2 +-
7 files changed, 41 insertions(+), 25 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 6060a29..5a796f8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -188,7 +188,9 @@ public class ConnectedStreams<IN1, IN2> {
* The {@link KeySelector} used for grouping the second input
* @return The partitioned {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?>
keySelector1, KeySelector<IN2, ?> keySelector2) {
+ public <KEY> ConnectedStreams<IN1, IN2> keyBy(
+ KeySelector<IN1, KEY> keySelector1,
+ KeySelector<IN2, KEY> keySelector2) {
return new ConnectedStreams<>(environment,
inputStream1.keyBy(keySelector1),
inputStream2.keyBy(keySelector2));
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 48bccfe..430241a 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -202,7 +202,7 @@ public class IterativeStream<T> extends
SingleOutputStreamOperator<T> {
}
@Override
- public ConnectedStreams<I, F> keyBy(KeySelector<I, ?>
keySelector1, KeySelector<F, ?> keySelector2) {
+ public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY>
keySelector1, KeySelector<F, KEY> keySelector2) {
throw groupingException;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 788385b..dcd6f5f 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -31,6 +31,7 @@ import
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -96,9 +97,11 @@ import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1027,6 +1030,22 @@ public class DataStreamTest extends TestLogger {
}
@Test
+ public void testKeyedConnectedStreamsType() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> stream1 = env.fromElements(1, 2);
+ DataStreamSource<Integer> stream2 = env.fromElements(1, 2);
+
+ ConnectedStreams<Integer, Integer> connectedStreams =
stream1.connect(stream2)
+ .keyBy(v -> v, v -> v);
+
+ KeyedStream<?, ?> firstKeyedInput = (KeyedStream<?, ?>)
connectedStreams.getFirstInput();
+ KeyedStream<?, ?> secondKeyedInput = (KeyedStream<?, ?>)
connectedStreams.getSecondInput();
+ assertThat(firstKeyedInput.getKeyType(), equalTo(Types.INT));
+ assertThat(secondKeyedInput.getKeyType(), equalTo(Types.INT));
+ }
+
+ @Test
public void sinkKeyTest() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index fadb3fe..2554969 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -261,6 +261,7 @@ under the License.
Can be removed once
https://github.com/siom79/japicmp/issues/176 will be fixed -->
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
<exclude>org.apache.flink.streaming.api.scala.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
+
<exclude>org.apache.flink.streaming.api.scala.ConnectedStreams#keyBy(scala.Function1,scala.Function1,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
</excludes>
</parameter>
</configuration>
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index caaf22d..3f0e141 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{ConnectedStreams =>
JavaCStream, DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co._
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
@@ -300,19 +299,18 @@ class ConnectedStreams[IN1, IN2](javaStream:
JavaCStream[IN1, IN2]) {
* @param fun2 The second stream's key function
* @return The key-grouped connected streams
*/
- def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2:
IN2 => K2):
+ def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY):
ConnectedStreams[IN1, IN2] = {
- val keyType1 = implicitly[TypeInformation[K1]]
- val keyType2 = implicitly[TypeInformation[K2]]
-
+ val keyType = implicitly[TypeInformation[KEY]]
+
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
-
- val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
- val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
-
- asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2))
+
+ val keyExtractor1 = new JavaKeySelector[IN1, KEY](cleanFun1)
+ val keyExtractor2 = new JavaKeySelector[IN2, KEY](cleanFun2)
+
+ asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2, keyType))
}
/**
@@ -332,12 +330,6 @@ class ConnectedStreams[IN1, IN2](javaStream:
JavaCStream[IN1, IN2]) {
}
@Internal
-class KeySelectorWithType[IN, K](
- private[this] val fun: IN => K,
- private[this] val info: TypeInformation[K])
- extends KeySelector[IN, K] with ResultTypeQueryable[K] {
-
+class JavaKeySelector[IN, K](private[this] val fun: IN => K) extends
KeySelector[IN, K] {
override def getKey(value: IN): K = fun(value)
-
- override def getProducedType: TypeInformation[K] = info
}
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
index fd48f9c..079701d 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.Experimental
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.datastream.{DataStreamUtils =>
JavaStreamUtils}
import scala.collection.JavaConverters._
@@ -46,11 +47,11 @@ class DataStreamUtils[T: TypeInformation : ClassTag](val
self: DataStream[T]) {
/**
* Reinterprets the given [[DataStream]] as a [[KeyedStream]], which
extracts keys with the
- * given [[KeySelectorWithType]].
+ * given [[KeySelector]].
*
* IMPORTANT: For every partition of the base stream, the keys of events in
the base stream
* must be partitioned exactly in the same way as if it was created through
a
- * [[DataStream#keyBy(KeySelectorWithType)]].
+ * [[DataStream#keyBy(KeySelector)]].
*
* @param keySelector Function that defines how keys are extracted from the
data stream.
* @return The reinterpretation of the [[DataStream]] as a [[KeyedStream]].
@@ -58,11 +59,12 @@ class DataStreamUtils[T: TypeInformation : ClassTag](val
self: DataStream[T]) {
def reinterpretAsKeyedStream[K: TypeInformation](
keySelector: T => K): KeyedStream[T, K] = {
- val keySelectorWithType =
- new KeySelectorWithType[T, K](clean(keySelector),
implicitly[TypeInformation[K]])
+ val keyTypeInfo = implicitly[TypeInformation[K]]
+ val cleanSelector = clean(keySelector)
+ val javaKeySelector = new JavaKeySelector[T, K](cleanSelector)
asScalaStream(
- JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream,
keySelectorWithType))
+ JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream,
javaKeySelector, keyTypeInfo))
}
private[flink] def clean[F <: AnyRef](f: F): F = {
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
index deb03a3..bc6733c 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
@@ -72,7 +72,7 @@ class OnConnectedStream[IN1, IN2](stream:
ConnectedStreams[IN1, IN2]) {
* @return The key-grouped connected streams
*/
@PublicEvolving
- def keyingBy[K1: TypeInformation, K2: TypeInformation](key1: IN1 => K1,
key2: IN2 => K2):
+ def keyingBy[KEY: TypeInformation](key1: IN1 => KEY, key2: IN2 => KEY):
ConnectedStreams[IN1, IN2] =
stream.keyBy(key1, key2)