mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1208303417

   Unfortunately I'm stuck with some flaky tests. It looks like watermarks are 
not advanced in a deterministic way.
   Below some logs of 
`org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy` (edited 
for readability).
   
   Successful run (watermark advanced early enough, so that timer is triggered):
   
   ```
   14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: input elements: 
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: non expired input elements: 
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:42:52,949 [3] TRACE WindowTracing  - 
ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for 
key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; 
outputWatermark:null
   14:42:52,957 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold 
at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; 
inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are 
SparkTimerInternals{highWatermark=BOUNDEDW_MIN, 
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, 
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, 
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], 
inputWatermark=BOUNDEDW_MIN}
   14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] 
[inputWatermark: BOUNDEDW_MIN]
   14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:42:53,137 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - 
Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, 
highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
   ```
   ```
   14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are 
SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW, 
timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), 
timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, 
deleted=false}], inputWatermark=BOUNDEDW_MIN}
   14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are 
[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), 
timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, 
deleted=false}] [inputWatermark: BOUNDEDW_MAX]
   ```
   ```
   14:42:53,146 [15] DEBUG WindowTracing  - ReduceFnRunner: Received timer 
key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=, 
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, 
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with 
inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,148 [15] DEBUG WindowTracing  - ReduceFnRunner: Cleaning up for 
key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX; 
outputWatermark:null
   14:42:53,148 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease: 
for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; 
outputWatermark:null
   14:42:53,149 [15] DEBUG WindowTracing  - 
WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
   14:42:53,150 [15] DEBUG WindowTracing  - describePane: ON_TIME pane (prev 
was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX; 
inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
   14:42:53,152 [15] TRACE WindowTracing  - ReduceFnRunner.onTrigger: 
outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
   14:42:53,152 [15] DEBUG WindowTracing  - WatermarkHold.clearHolds: For 
key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: output elements are 
TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX, 
pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
onTimeIndex=0}} 1
   ```
   
   Failed run (watermark is advanced too late, element is lost):
   ```
   14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: input elements: 
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: non expired input elements: 
[ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:41:51,463 [3] TRACE WindowTracing  - 
ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for 
key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; 
outputWatermark:null
   14:41:51,471 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold 
at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; 
inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are 
SparkTimerInternals{highWatermark=BOUNDEDW_MIN, 
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, 
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, 
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], 
inputWatermark=BOUNDEDW_MIN}
   14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] 
[inputWatermark: BOUNDEDW_MIN]
   14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are 
SparkTimerInternals{highWatermark=BOUNDEDW_MIN, 
synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, 
namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, 
outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], 
inputWatermark=BOUNDEDW_MIN}
   14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] 
[inputWatermark: BOUNDEDW_MIN]
   14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - 
Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:41:51,662 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - 
Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, 
highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to