yelianevich commented on issue #26465:
URL: https://github.com/apache/beam/issues/26465#issuecomment-1929855207

   @ballooncross Looking at your code I can suggest an alternative approach by 
using a custom global combiner that will eliminate subsequent 
`Latest.globally()` and `View.asSingleton()` and will do everything in one step.
   
   ```
   PCollectionView<Map<String, String>> map = pipeline.apply("Impulse", 
                                                
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                   .apply(Window.<Map<String, String>>into(new GlobalWindows())
                           
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                           .discardingFiredPanes())
                   .apply(Combine.globally(new MaxImpulseFn<>(functionToRead, 
coderOfFunctionOutput))
                        .withoutDefaults()
                        .asSingletonView());
   ```
   `MaxImpulseFn` is a custom combiner (even though quite simple) that finds 
the max impulse and reads the external data in `extractOutput`
   ```
   private static class MaxImpulseFn<T> extends 
CombineWithContext.CombineFnWithContext<Long, Long, T> {
   
          @Override
           public T extractOutput(Long accumulator, Context context) {
               return functionToRead.apply(accumulator, 
context.getPipelineOptions());
           }
   
           @Override
           public Coder<T> getDefaultOutputCoder(CoderRegistry registry, 
Coder<Long> inputCoder) {
               return coderOfFunctionOutput;
           }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to