[
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348790#comment-16348790
]
ASF GitHub Bot commented on FLINK-8345:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r165397632
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof
ProcessOperator);
}
+ @Test
+ public void testConnectWithBroadcastTranslation() throws Exception {
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(0L, "test:0");
+ expected.put(1L, "test:1");
+ expected.put(2L, "test:2");
+ expected.put(3L, "test:3");
+ expected.put(4L, "test:4");
+ expected.put(5L, "test:5");
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+ .assignTimestampsAndWatermarks(new
CustomWmEmitter<Long>() {
+
+ @Override
+ public long extractTimestamp(Long
element, long previousElementTimestamp) {
+ return element;
+ }
+ }).keyBy((KeySelector<Long, Long>) value ->
value);
+
+ final DataStream<String> srcTwo =
env.fromCollection(expected.values())
+ .assignTimestampsAndWatermarks(new
CustomWmEmitter<String>() {
+ @Override
+ public long extractTimestamp(String
element, long previousElementTimestamp) {
+ return
Long.parseLong(element.split(":")[1]);
+ }
+ });
+
+ final BroadcastStream<String> broadcast =
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+ // the timestamp should be high enough to trigger the timer
after all the elements arrive.
+ final DataStream<String> output =
srcOne.connect(broadcast).process(
+ new TestBroadcastProcessFunction(100000L,
expected));
+
+ output.addSink(new DiscardingSink<>());
+ env.execute();
--- End diff --
So far, all tests in this are purely translation tests. I mentioned this in
another comment, that it would be good to have an ITCase that actually verifies
that using keyed state works and that the other features work as well in a
complete program. Have a look at `SideOutputITCase`, for example. 👍
> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Major
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)