[
https://issues.apache.org/jira/browse/FLINK-25199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459042#comment-17459042
]
Marios Trivyzas commented on FLINK-25199:
-----------------------------------------
This issue is not Table/SQL related but originates from self {*}union{*}ing a
datastream, so one can reproduce it with:
{noformat}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3).setParallelism(1);
// add a printout statement to print the emitted watermark
dataStream1.union(dataStream1).print();
// or use .addSink(new DiscardingSink<>()); and debug to check that the
writeWatermark() on the DiscardingSink is not called
env.execute();{noformat}
If instead of the "self" union we union 2 separate datastreams like:
{noformat}
DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3).setParallelism(1);
DataStream<Integer> dataStream2 = dataStream1.map(i -> i + 1).setParallelism(1);
dataStream1.union(dataStream1).print(); //addSink(new DiscardingSink<>());
env.execute();{noformat}
Then the watermark is emitted correctly.
>From some debugging in
>*StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels*
The following is false:
{noformat}
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
}{noformat}
Because there are 2 channels involved and in the for loop above that code ^^:
{noformat}
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}{noformat}
One of the channels is updated with the watermark set to *Long.MAX_VALUE* but
the other channel still has the initial value of *Long.MIN_VALUE* so the total
*Math.min* gives out *Long.MIN_VALUE.*
That's because the {*}StatusWatermarkValve#{*}{*}inputWatermark{*} which
updates the channel's watermark in line *95* is always called from
*AbstractStreamTaskNetworkInput#processElement* with
the same *lastChannel* which is just one of the 2 channels used in this union
case.
> fromValues does not emit final MAX watermark
> --------------------------------------------
>
> Key: FLINK-25199
> URL: https://issues.apache.org/jira/browse/FLINK-25199
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Reporter: Timo Walther
> Assignee: Dawid Wysakowicz
> Priority: Critical
> Fix For: 1.15.0, 1.14.2
>
>
> It seems {{fromValues}} that generates multiple rows does not emit any
> watermarks:
> {code}
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table inputTable =
> tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> DataTypes.FIELD("f4", DataTypes.INT()),
> DataTypes.FIELD("label", DataTypes.STRING())),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "a", 1., 1., 1., 2, "l1"));
> DataStream<Row> input = tEnv.toDataStream(inputTable);
> {code}
> {{fromValues(1, 2, 3)}} or {{fromValues}} with only 1 row works correctly.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)