Hi all,

I've been toying around with stateful DoFns recently and was attempting some 
approaches involving buffering when I realized that it seemed that my existing 
state was being ignored in the following DoFn:

```
class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, 
ExampleRecord>>() {
    @StateId("count")
    private val count: StateSpec<ValueState<Long>> = 
StateSpecs.value(VarLongCoder.of())

    @ProcessElement
    fun processElement(@StateId("count") countState: ValueState<Long>) {
        var recordsBuffered = countState.read() ?: 0
        recordsBuffered++
        
        // Update the state
        countState.write(recordsBuffered)
        
        // Do something later
    }
}
```

It seems that every time my DoFn gets hit, the value of my state is always 
null. Does anything look particularly off with the above definition? I know in 
the past I've encountered some issues related to Kotlin and typing, however I 
tried to be as explicit as possible with all of the type declarations and 
coders.

There aren't any errors, it just seems like the state.write() operation doesn't 
seem to acknowledge the updated value or persist it outside of the context of 
the processElement operation. This is just reading from a set of data using the 
LocalRunner on my own machine, so there shouldn't be anything specific to any 
given runner.

Any ideas would be welcome! 

Thanks,

Rion

Reply via email to