mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1208303417
Unfortunately I'm stuck with some flaky tests. It looks like watermarks are
not advanced in a deterministic way.
Below some logs of
`org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy` (edited
for readability).
Successful run (watermark advanced early enough, so that timer is triggered):
```
14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: input elements:
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: non expired input elements:
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:42:52,949 [3] TRACE WindowTracing -
ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for
key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN;
outputWatermark:null
14:42:52,957 [3] TRACE WindowTracing - WatermarkHold.addHolds: element hold
at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow;
inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are
SparkTimerInternals{highWatermark=BOUNDEDW_MIN,
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=,
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX,
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}],
inputWatermark=BOUNDEDW_MIN}
14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are []
[inputWatermark: BOUNDEDW_MIN]
14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: output elements are 0
```
```
14:42:53,137 [spark-listener-group-appStatus] INFO GlobalWatermarkHolder -
Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN,
highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
```
```
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are
SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW,
timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow),
timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME,
deleted=false}], inputWatermark=BOUNDEDW_MIN}
14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are
[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow),
timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME,
deleted=false}] [inputWatermark: BOUNDEDW_MAX]
```
```
14:42:53,146 [15] DEBUG WindowTracing - ReduceFnRunner: Received timer
key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=,
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX,
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with
inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing - ReduceFnRunner: Cleaning up for
key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX;
outputWatermark:null
14:42:53,148 [15] DEBUG WindowTracing - WatermarkHold.extractAndRelease:
for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX;
outputWatermark:null
14:42:53,149 [15] DEBUG WindowTracing -
WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
14:42:53,150 [15] DEBUG WindowTracing - describePane: ON_TIME pane (prev
was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX;
inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
14:42:53,152 [15] TRACE WindowTracing - ReduceFnRunner.onTrigger:
outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
14:42:53,152 [15] DEBUG WindowTracing - WatermarkHold.clearHolds: For
key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: output elements are
TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX,
pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
onTimeIndex=0}} 1
```
Failed run (watermark is advanced too late, element is lost):
```
14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: input elements:
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: non expired input elements:
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
14:41:51,463 [3] TRACE WindowTracing -
ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for
key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN;
outputWatermark:null
14:41:51,471 [3] TRACE WindowTracing - WatermarkHold.addHolds: element hold
at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow;
inputWatermark:BOUNDEDW_MIN; outputWatermark:null
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are
SparkTimerInternals{highWatermark=BOUNDEDW_MIN,
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=,
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX,
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}],
inputWatermark=BOUNDEDW_MIN}
14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are []
[inputWatermark: BOUNDEDW_MIN]
14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: output elements are 0
```
```
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are
SparkTimerInternals{highWatermark=BOUNDEDW_MIN,
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=,
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX,
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}],
inputWatermark=BOUNDEDW_MIN}
14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are []
[inputWatermark: BOUNDEDW_MIN]
14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet -
Group.ByFields/ToKvs/GroupByKey: output elements are 0
```
```
14:41:51,662 [spark-listener-group-appStatus] INFO GlobalWatermarkHolder -
Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN,
highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]