dmvk commented on a change in pull request #15994:
URL: https://github.com/apache/beam/pull/15994#discussion_r768145300
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
}
}
+ public List<ByteBuffer> getGlobalWindowStateKeys() {
Review comment:
@reuvenlax It may not be enough. Take the bellow DoFn as an example. If
we can not rely on having an internal state for each key that we've seen,
wouldn't the example simply break?
```
new DoFn<KV<String, String>, Void> {
void processFn(KV<String, String> element) {
// write element to DB
}
@OnWindowExpiration
void expire() {
// delete element from DB... because you know, we can :)
}
}
```
I have a feeling that we either:
- need to rely on state we know we always have
- we can not optimize EOGW gc timers in case we want OnExpiration callback
and rely on those
Other "broken" example would be something along this lines...
```
new DoFn<KV<String, String>, Void> {
BagState<String> buf;
void processFn(KV<String, String> element) {
if (buf.size() > 3) {
buf.forEach(db::write);
buf.clear();
} else {
buf.add(element);
}
}
@OnWindowExpiration
void expire() {
// Flush remaining elements ... if we have any
buf.forEach(db::write);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]