Check out GroupIntoBatches.

On Fri, Jun 12, 2020 at 3:53 PM Rion Williams <[email protected]> wrote:

> Hi Luke,
>
> I think that’s likely my mistake. I had forgotten that was tied to a given
> key-window. In this example use case, all of the data is keyed differently
> (and thus not associated to a window or a key), so knowing that I’m quite
> sure it’s the issue.
>
> In this scenario I’m experimenting with an external service enrichment
> pattern and was hoping to buffer n records before eventually issuing a
> request to the data store for the appropriate keys (I.e. every n records,
> issue a request for all the keys associated with those, enrich and emit).
>
> Just trying to strike the balance between throughput and not issuing more
> requests than necessary to the source. Any recommendations? Perhaps just a
> global window would address this? I’ll be happy to answer any questions I
> can on the implementation / data side if you have any.
>
> Thanks,
>
> Rion
>
> On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote:
>
> 
> Simple question, you are expecting to see prior results under the same
> window and key which you are not seeing (since state is per key and window)?
>
> On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]>
> wrote:
>
>> Hi all,
>>
>> I've been toying around with stateful DoFns recently and was attempting
>> some approaches involving buffering when I realized that it seemed that my
>> existing state was being ignored in the following DoFn:
>>
>> ```
>> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String,
>> ExampleRecord>>() {
>>     @StateId("count")
>>     private val count: StateSpec<ValueState<Long>> =
>> StateSpecs.value(VarLongCoder.of())
>>
>>     @ProcessElement
>>     fun processElement(@StateId("count") countState: ValueState<Long>) {
>>         var recordsBuffered = countState.read() ?: 0
>>         recordsBuffered++
>>
>>         // Update the state
>>         countState.write(recordsBuffered)
>>
>>         // Do something later
>>     }
>> }
>> ```
>>
>> It seems that every time my DoFn gets hit, the value of my state is
>> always null. Does anything look particularly off with the above definition?
>> I know in the past I've encountered some issues related to Kotlin and
>> typing, however I tried to be as explicit as possible with all of the type
>> declarations and coders.
>>
>> There aren't any errors, it just seems like the state.write() operation
>> doesn't seem to acknowledge the updated value or persist it outside of the
>> context of the processElement operation. This is just reading from a set of
>> data using the LocalRunner on my own machine, so there shouldn't be
>> anything specific to any given runner.
>>
>> Any ideas would be welcome!
>>
>> Thanks,
>>
>> Rion
>>
>

Reply via email to