tillrohrmann commented on a change in pull request #17027:
URL: https://github.com/apache/flink/pull/17027#discussion_r698366255
##########
File path:
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
##########
@@ -158,24 +161,34 @@ public void open(Configuration parameters) throws
Exception {
stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
state = getRuntimeContext().getMapState(stateDescriptor);
- updateCount();
-
- LOG.info("Open {} with a count of {}.",
getClass().getSimpleName(), count);
- }
-
- private void updateCount() throws Exception {
- count = Iterables.size(state.keys());
+ countsAtCheckpoint = new HashMap<>();
+ count = -1;
+ lastCompletedCheckpoint = -1;
}
@Override
public void flatMap(Email value, Collector<Object> out) throws
Exception {
state.put(value.getEmailId(), new EmailInformation(value));
- updateCount();
+ count = Iterables.size(state.keys());
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
- System.out.println("Count on snapshot: " + count); // we look for
it in the test
+ if (checkpointId > lastCompletedCheckpoint) {
Review comment:
Yes, it is a safety that that we don't output decreasing values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]