zentol commented on a change in pull request #17027:
URL: https://github.com/apache/flink/pull/17027#discussion_r698253003



##########
File path: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
##########
@@ -158,24 +161,34 @@ public void open(Configuration parameters) throws 
Exception {
 
             stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
             state = getRuntimeContext().getMapState(stateDescriptor);
-            updateCount();
-
-            LOG.info("Open {} with a count of {}.", 
getClass().getSimpleName(), count);
-        }
-
-        private void updateCount() throws Exception {
-            count = Iterables.size(state.keys());
+            countsAtCheckpoint = new HashMap<>();
+            count = -1;
+            lastCompletedCheckpoint = -1;
         }
 
         @Override
         public void flatMap(Email value, Collector<Object> out) throws 
Exception {
             state.put(value.getEmailId(), new EmailInformation(value));
-            updateCount();
+            count = Iterables.size(state.keys());
         }
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) {
-            System.out.println("Count on snapshot: " + count); // we look for 
it in the test
+            if (checkpointId > lastCompletedCheckpoint) {

Review comment:
       Is this required to ensure the count (as seen from the test) does not 
decrease?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to