InigoSJ opened a new issue, #23071:
URL: https://github.com/apache/beam/issues/23071

   ### What happened?
   
   The Python trigger `AfterProcessingTime` behaves different than Java's 
`AfterProcessingTime.pastFirstElementInPane().plusDelayOf`. 
   
   While Java behaves as "wait X time since the first element to trigger", 
Python behaves similar to a Session Window, where the wait is since the 
previous element instead of the first element in pane:
   
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py#L387
   ```
     def on_element(self, element, window, context):
       context.set_timer(
           '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
    ```
    
    You can see this in this example:
    
    **Python**
   ```
   def get_input_stream():
     stream = (
         TestStream().add_elements([
             TimestampedValue("1", timestamp=0)
         ])
         .advance_watermark_to(1.5)
         .advance_processing_time(1.5)
         .add_elements([
             TimestampedValue("2", timestamp=1)
         ])
         .advance_watermark_to(2.5)
         .advance_processing_time(1) # Running Processing time 2.5 (it should 
trigger now)
         .add_elements([
             TimestampedValue("3", timestamp=3)
         ])
         .advance_watermark_to(3)
         .advance_processing_time(0.5) # Running Processing time 3
         .add_elements([
             TimestampedValue("4", timestamp=4)
         ])
         .advance_watermark_to(5)
         .advance_processing_time(2) # Running Processing time 5, it triggers 
now since it's 2s since last element
         .add_elements([
             TimestampedValue("5", timestamp=4)
         ])
         .advance_watermark_to(6)
         .advance_processing_time(1.5)
         .add_elements([
             TimestampedValue("6", timestamp=8)
         ])
         .advance_watermark_to(9)
         .advance_processing_time(1.5)
         .add_elements([
             TimestampedValue("7", timestamp=7),
         ])
         .advance_watermark_to(10)
         .advance_processing_time(1.5)
         .add_elements([
             TimestampedValue("8", timestamp=8),
         ])
         .advance_watermark_to(11)
         .advance_processing_time(1.5)
         .add_elements([
             TimestampedValue("9", timestamp=7),
         ])
         .advance_watermark_to(12)
         .advance_processing_time(1.5) # Running Processing time 11.5, not 
triggering since 5
         .add_elements([
             TimestampedValue("10", timestamp=9),
         ])
         .advance_processing_time(2)
         .advance_watermark_to_infinity()
     )
     return stream
   
   options = PipelineOptions(streaming=True)
   
   p = TestPipeline(options=options)
   
   window_size_seconds = 10
   window_allowed_lateness_seconds = 5
   count_pass = 3
   delay = 2
   
   stream = get_input_stream()
   
   (p | stream
      | WindowInto(
         FixedWindows(size=window_size_seconds),
         allowed_lateness=window_allowed_lateness_seconds,
         accumulation_mode=trigger.AccumulationMode.DISCARDING,
         trigger=trigger.Repeatedly(trigger.AfterProcessingTime(delay))
     )
    | Map(lambda e: ("key", e))
    | GroupByKey()
    | Map(print)
   )
   
   p.run()
   ````
   
   **Java**
   
   ```
   Pipeline p = Pipeline.create(options);
   
           Integer windowLength = 10;
           Integer allowLateSize = 5;
           Integer delay = 2;
   
           TestStream<String> streamEvents = 
TestStream.create(StringUtf8Coder.of())
                   .addElements(
                           TimestampedValue.of("1", new Instant(0))
                   )
                   .advanceWatermarkTo(new Instant(1500))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("2", new Instant(1000))
                   )
                   .advanceWatermarkTo(new Instant(2500))
                   .advanceProcessingTime(Duration.millis(1000))  // Running 
Processing time 2.5, it triggers here
                   .addElements(
                           TimestampedValue.of("3", new Instant(3000))
                   )
                   .advanceWatermarkTo(new Instant(3000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("4", new Instant(4000))
                   )
                   .advanceWatermarkTo(new Instant(5000))
                   .advanceProcessingTime(Duration.standardSeconds(2))
                   .addElements(
                           TimestampedValue.of("5", new Instant(4000))
                   )
                   .advanceWatermarkTo(new Instant(6000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("6", new Instant(8000))
                           )
                   .advanceWatermarkTo(new Instant(9000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("7", new Instant(7000))
                   )
                   .advanceWatermarkTo(new Instant(10000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("8", new Instant(8000))
   
                   )
                   .advanceWatermarkTo(new Instant(11000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("9", new Instant(7000))
   
                   )
                   .advanceWatermarkTo(new Instant(12000))
                   .advanceProcessingTime(Duration.millis(1500))
                   .addElements(
                           TimestampedValue.of("10", new Instant(9000))
                   )
                   .advanceWatermarkToInfinity();
   
           p.apply(streamEvents)
                   .apply("KVs", ParDo.of(new DoFn<String, KV<String, 
String>>() {
                       @ProcessElement
                       public void processElement(ProcessContext c) {
                           c.output(KV.of("key", c.element()));
                       }
                   }))
                   .apply(Window.<KV<String, String>>into(
                           
FixedWindows.of(Duration.standardSeconds(windowLength)))
                               
.withAllowedLateness(Duration.standardSeconds(allowLateSize))
                               .triggering(
                                   Repeatedly.forever(
                                           
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(delay)))
                                   )
                           .discardingFiredPanes()
                   )
                   .apply(GroupByKey.create())
                   .apply("Log", ParDo.of(new DoFn<KV<String, 
Iterable<String>>, String>() {
                       @ProcessElement
                       public void processElement(ProcessContext c) {
                           LOG.info("\n TRIGGER " + 
c.element().getValue().toString());
                           c.output(c.pane().toString());
                       }
                   }));
   
   
           p.run();
   ```
   
   
   The output in **Python** is two panes `['1', '2', '3', '4'], ['5', '6', '7', 
'8', '9', '10']` and **Java**  is the "right" output `['1', '2'], ['3', '4'], 
['5', '6'], ['7', '8'], ['9', '10']`.
   
   ______________________
   
   
   The fix doesn't seem hard (worse thing to say ever), but given that users 
may be using this trigger already, I am not sure how to proceed.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-py-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to