tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513184588



##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws 
IOException {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not 
in a logging state");
 
     final DataOutputView target = new 
DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) 
{
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, 
entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups 
assigned
+    // to this operator must be written to the underlying stream.

Review comment:
       nit: I'm wondering if it makes sense to add a TODO here to help remind 
us in the future that after FLINK-19748 (allow skipping key groups) is merged, 
we may choose to revert writing empty key groups?

##########
File path: 
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
##########
@@ -79,6 +81,21 @@ public void roundTripWithSpill() throws Exception {
     roundTrip(1_000_000, 0);
   }
 
+  @Test
+  public void testHeader() throws IOException {

Review comment:
       As I understand it, this test verifies the header serde round trip, in 
the case that the header was written.
   
   As a counterpart, could you add a test that verifies 
`Header.skipHeaderSilently` is effectively a no-op if the header was missing in 
the input stream?
   
   i.e.,
   another variant of this test where the line 
`UnboundedFeedbackLogger.Header.writeHeader(out);` is removed should be passing 
as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to