chenyuzhi459 commented on code in PR #18118:
URL: https://github.com/apache/flink/pull/18118#discussion_r965913266
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java:
##########
@@ -155,6 +159,128 @@ public void testJoinsCorrectlyWithMultipleKeys() throws
Exception {
"(key2,5):(key2,5)");
}
+ private DataStream<Tuple2<String, Integer>> buildSourceStream(
+ final StreamExecutionEnvironment env, final SourceConsumer
sourceConsumer) {
+ return env.addSource(
+ new SourceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>>
ctx) {
+ sourceConsumer.accept(ctx);
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing
+ }
+ });
+ }
+
+ // Ensure consumer func is serializable
+ private interface SourceConsumer
+ extends Serializable,
Consumer<SourceFunction.SourceContext<Tuple2<String, Integer>>> {
+ long serialVersionUID = 1L;
+ }
+
+ @Test
+ public void testIntervalJoinSideOutputLeftLateData() throws Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> streamOne =
+ buildSourceStream(
+ env,
+ (ctx) -> {
+ ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+ ctx.emitWatermark(new Watermark(3));
+ ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
// late data
+ });
+
+ DataStream<Tuple2<String, Integer>> streamTwo =
+ buildSourceStream(
+ env,
+ (ctx) -> {
+ ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+ ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+ ctx.emitWatermark(new Watermark(2));
+ ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+ });
+
+ OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String,
Integer>>("late") {};
+
+ SingleOutputStreamOperator<String> process =
+ streamOne
+ .keyBy(new Tuple2KeyExtractor())
+ .intervalJoin(streamTwo.keyBy(new
Tuple2KeyExtractor()))
+ .between(Time.milliseconds(-1), Time.milliseconds(1))
+ .sideOutputLeftLateData(late)
+ .process(new CombineToStringJoinFunction());
+
+ process.getSideOutput(late)
Review Comment:
Ok, it has been optimized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]