Simple question, you are expecting to see prior results under the same window and key which you are not seeing (since state is per key and window)?
On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> wrote: > Hi all, > > I've been toying around with stateful DoFns recently and was attempting > some approaches involving buffering when I realized that it seemed that my > existing state was being ignored in the following DoFn: > > ``` > class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, > ExampleRecord>>() { > @StateId("count") > private val count: StateSpec<ValueState<Long>> = > StateSpecs.value(VarLongCoder.of()) > > @ProcessElement > fun processElement(@StateId("count") countState: ValueState<Long>) { > var recordsBuffered = countState.read() ?: 0 > recordsBuffered++ > > // Update the state > countState.write(recordsBuffered) > > // Do something later > } > } > ``` > > It seems that every time my DoFn gets hit, the value of my state is always > null. Does anything look particularly off with the above definition? I know > in the past I've encountered some issues related to Kotlin and typing, > however I tried to be as explicit as possible with all of the type > declarations and coders. > > There aren't any errors, it just seems like the state.write() operation > doesn't seem to acknowledge the updated value or persist it outside of the > context of the processElement operation. This is just reading from a set of > data using the LocalRunner on my own machine, so there shouldn't be > anything specific to any given runner. > > Any ideas would be welcome! > > Thanks, > > Rion >
