Evgeny created BEAM-6036:
----------------------------

             Summary: How to periodically refresh side inputs.
                 Key: BEAM-6036
                 URL: https://issues.apache.org/jira/browse/BEAM-6036
             Project: Beam
          Issue Type: Bug
          Components: beam-model
            Reporter: Evgeny
            Assignee: Kenneth Knowles


I have followed the example provided here 
[https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1]
 in the "Pattern: Slowly-changing lookup cache" section. I've converted the 
pseudo-code from the article into this Java code:


return p
.apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
Duration.standardHours(1)))
 .apply("GenerateSequenceWindow",
 Window.<Long>into(new GlobalWindows()).triggering(
 Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
 .discardingFiredPanes())
 .apply("ConvertToKVs",
 ParDo.of(new RetrieveToKVs()))
 .apply("ToMap", View.asMap());

RetrieveToKVs() queries BigQuery table and outputs KVs. 

The issue here is that the resulting map mixes up KVs from different periods 
(i.e. the sequence is generated every 1 hour, the resulting map includes KVs 
from 2 adjacent hours).

In an attempt to solve it I tried using View.asSingleton() instead.

        return p
            .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
Duration.standardHours(1)))
            .apply("GenerateSequenceWindow",
                Window.<Long>into(new GlobalWindows()).triggering(
                    
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply("ConvertToKVs",
                ParDo.of(new RetrieveMap()))
            .apply("ToMap", View.asSingleton());

RetrieveMap queries data from BigQuery and outputs the complete map. The issue 
with this is it not only results in flaky tests with the exception 1 times out 
of 10:

Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as a 
singleton view. Consider setting withDefault to provide a def
ault value

but also it doesn't seem to work. In the logs I see the RetrieveMap is called 
every hour, but the pipeline using the side input get stale data. 

Is there a real working example for how to make a side input refresh 
periodically? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to