[
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448904#comment-17448904
]
Wei-Che Wei edited comment on FLINK-25048 at 11/25/21, 2:41 AM:
----------------------------------------------------------------
Hi [~liangxinli], try this
{code:java}
SingleOutputStreamOperator<String> process = lineDSS.process(new
ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx,
Collector<String> out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) ->
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}
the problem is you tried to get the sideoutput right after another
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by
`lineDSS.process`.
if you want to generate timestamp and watermark for both streams, you should
consider adding `assignTimestampsAndWatermarks` before `process`.
was (Author: tonywei):
Hi [~liangxinli], try this
{code:java}
SingleOutputStreamOperator<String> process = lineDSS.process(new
ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx,
Collector<String> out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) ->
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}
the problem is you tried to get the sideoutput right after another
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by
`lineDSS.process`.
> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---------------------------------------------------------------------------
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Reporter: xinli liang
> Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999);
> OutputTag<String> outputTag = new OutputTag<String>("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator<String> process = lineDSS.process(new
> ProcessFunction<String, String>() {
> @Override
> public void processElement(String value, Context ctx, Collector<String> out)
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
--
This message was sent by Atlassian Jira
(v8.20.1#820001)