Many thanks for your help. Actually, my use case emits the entire map
everytime, so I guess I'm good to go with discarding mode.

This test reproduces the issue:
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53

Hope it helps

On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote:

> Carlos, can you provide a test/code snippet for the bug that shows the
> issue?
>
> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> +dev@beam.apache.org
>> Note that this is likely a bug in the DirectRunner for accumulation mode,
>> filed: https://issues.apache.org/jira/browse/BEAM-4470
>>
>> Discarding mode is meant to always be the latest firing, the issue though
>> is that you need to emit the entire map every time. If you can do this,
>> then it makes sense to use discarding mode. The issue with discarding mode
>> is that if your first trigger firing produces (A, 1), (B, 1) and your
>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>> (A, 1) will have been discarded.
>>
>> To my knowledge, there is no guarantee about the order in which the
>> values are combined. You will need to use some piece of information about
>> the element to figure out which is the latest (or encode some additional
>> information along with each element to make this easy).
>>
>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> I've improved the example a little and added some tests
>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>
>>> The behaviour is slightly different, which is possibly because of the
>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>
>>> Now what happens is that although the internal PCollection gets updated,
>>> the view isn't. This is happening regardless of the accumulation mode.
>>>
>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>> sets contain all the items, however, one more question, is the ordering
>>> within the set deterministic? (i.e: Can I assume that the latest will
>>> always be on the last position of the Iterable object?)
>>>
>>> Also... given that for my particular case I only want the latest
>>> version, would you advice me to go ahead with Discarding mode?
>>>
>>> Regards
>>>
>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The trigger definition in the sample code you have is using discarding
>>>> firing mode. Try swapping to using accumulating mode.
>>>>
>>>>
>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>> side input is updated, but only one element is found on the Iterable that
>>>>> is the value of any key of the multimap.
>>>>>
>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>> Kenn suggests that every firing will add the new value to the set of 
>>>>> values
>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>
>>>>> @Robert, I'm using
>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>
>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>> retractions) is to add explicit support for combiners into side inputs. 
>>>>>> The
>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating 
>>>>>> the
>>>>>> set of values emitted and then turning it into a view which is why it is 
>>>>>> an
>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>
>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <k...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>>> experiencing something unexpected (at least to me) that I'd like to 
>>>>>>>> share
>>>>>>>> with you to clarify.
>>>>>>>>
>>>>>>>> The way I understand multimaps is that when one emits two values
>>>>>>>> for the same key for the same window (obvious thing here as I'm 
>>>>>>>> working on
>>>>>>>> the Global one), the newly emitted values are appended to the Iterable
>>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>>
>>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>>> implemented with PCollectionViews):
>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>
>>>>>>>> The steps to reproduce are:
>>>>>>>> 1. Create one table on the target BQ
>>>>>>>> 2. Run the job
>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a
>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>>> there is only one element within the iterable, as if it had been 
>>>>>>>> updated
>>>>>>>> instead of appended!!
>>>>>>>>
>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>

Reply via email to