igalshilman commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513369798
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws
IOException {
checkState(keyedStateOutputStream != null, "Trying to flush envelopes not
in a logging state");
final DataOutputView target = new
DataOutputViewStreamWrapper(keyedStateOutputStream);
- for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet())
{
- checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream,
entry.getKey());
+ final Iterable<Integer> assignedKeyGroupIds =
+ checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+ // the underlying checkpointed raw stream, requires that all key groups
assigned
+ // to this operator must be written to the underlying stream.
Review comment:
I think that it would be better to write down the empty groups anyways,
because they are presented to us on restore.
And having a header there, would help us to differentiate between different
versions.
But I can definitely add a TODO: revist this after 19748 is merged.
would that work for you?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]