[ 
https://issues.apache.org/jira/browse/BEAM-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688739#comment-16688739
 ] 

Kenneth Knowles commented on BEAM-6036:
---------------------------------------

This is a good question for StackOverflow. You have windowed into 
GlobalWindows, so all the KVs are in the same window. That is why they are all 
being returned as part of the View.asMap() side input.

> How to periodically refresh side inputs
> ---------------------------------------
>
>                 Key: BEAM-6036
>                 URL: https://issues.apache.org/jira/browse/BEAM-6036
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Evgeny
>            Assignee: Kenneth Knowles
>            Priority: Blocker
>
> I have followed the example provided here 
> [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1]
>  in the "Pattern: Slowly-changing lookup cache" section. I've converted the 
> pseudo-code from the article into this Java code:
> return p
> .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
> Duration.standardHours(1)))
>  .apply("GenerateSequenceWindow",
>  Window.<Long>into(new GlobalWindows()).triggering(
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>  .discardingFiredPanes())
>  .apply("RetrieveKVs",
>  ParDo.of(new RetrieveKVs()))
>  .apply("ToMap", View.asMap());
> RetrieveKVs() queries BigQuery table and outputs KVs. 
> The issue here is that the resulting map mixes up KVs from different periods 
> (i.e. the sequence is generated every 1 hour, the resulting map includes KVs 
> from 2 adjacent hours).
> In an attempt to solve it I tried using View.asSingleton() instead.
>         return p
>             .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
> Duration.standardHours(1)))
>             .apply("GenerateSequenceWindow",
>                 Window.<Long>into(new GlobalWindows()).triggering(
>                     
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>                     .discardingFiredPanes())
>             .apply("RetrieveMap",
>                 ParDo.of(new RetrieveMap()))
>             .apply("ToMap", View.asSingleton());
> RetrieveMap queries data from BigQuery and outputs the complete map. The 
> issue with this is it not only results in flaky tests with the exception 1 
> times out of 10:
> Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as 
> a singleton view. Consider setting withDefault to provide a def
> ault value
> but also it doesn't seem to work. In the logs I see the RetrieveMap is called 
> every hour, but the pipeline using the side input gets stale data. 
> Is there a real working example for how to make a side input refresh 
> periodically? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to