[ 
https://issues.apache.org/jira/browse/FLINK-25199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459042#comment-17459042
 ] 

Marios Trivyzas commented on FLINK-25199:
-----------------------------------------

This issue is not Table/SQL related but originates from self {*}union{*}ing a 
datastream, so one can reproduce it with:

 
{noformat}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3).setParallelism(1);

// add a printout statement to print the emitted watermark
dataStream1.union(dataStream1).print();
// or  use .addSink(new DiscardingSink<>()); and debug to check that the 
writeWatermark() on the DiscardingSink is not called
env.execute();{noformat}
If instead of the "self" union we union 2 separate datastreams like:

 
{noformat}
DataStream<Integer> dataStream1 = env.fromElements(1, 2, 3).setParallelism(1);
DataStream<Integer> dataStream2 = dataStream1.map(i -> i + 1).setParallelism(1);

dataStream1.union(dataStream1).print(); //addSink(new DiscardingSink<>());
env.execute();{noformat}
Then the watermark is emitted correctly.

 

 

>From some debugging in 
>*StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels*

The following is false:
{noformat}
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
    lastOutputWatermark = newMinWatermark;
    output.emitWatermark(new Watermark(lastOutputWatermark));
}{noformat}
 

 

Because there are 2 channels involved and in the for loop above that code ^^:


{noformat}
for (InputChannelStatus channelStatus : channelStatuses) {
    if (channelStatus.isWatermarkAligned) {
        hasAlignedChannels = true;
        newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
    }
}{noformat}
One of the channels is updated with the watermark set to *Long.MAX_VALUE* but 
the other channel still has the initial value of *Long.MIN_VALUE* so the total 
*Math.min* gives out *Long.MIN_VALUE.*

 

That's because the  {*}StatusWatermarkValve#{*}{*}inputWatermark{*} which 
updates the channel's watermark in line *95* is always called from 
*AbstractStreamTaskNetworkInput#processElement* with

the same *lastChannel* which is just one of the 2 channels used in this union 
case.

 

> fromValues does not emit final MAX watermark
> --------------------------------------------
>
>                 Key: FLINK-25199
>                 URL: https://issues.apache.org/jira/browse/FLINK-25199
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>            Reporter: Timo Walther
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.15.0, 1.14.2
>
>
> It seems {{fromValues}} that generates multiple rows does not emit any 
> watermarks:
> {code}
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>         Table inputTable =
>                 tEnv.fromValues(
>                         DataTypes.ROW(
>                                 DataTypes.FIELD("weight", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f0", DataTypes.STRING()),
>                                 DataTypes.FIELD("f1", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f2", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f3", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f4", DataTypes.INT()),
>                                 DataTypes.FIELD("label", DataTypes.STRING())),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"));
>         DataStream<Row> input = tEnv.toDataStream(inputTable);
> {code}
> {{fromValues(1, 2, 3)}} or {{fromValues}} with only 1 row works correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to