rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r689777951
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -329,37 +332,47 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
// materialization may truncate only a part of the previous result and
the backend would
// have to split it somehow for the former option, so the latter is
used.
lastCheckpointId = checkpointId;
- lastUploadedFrom = materializedTo;
+ lastUploadedFrom =
periodicMaterializer.getMaterializedState().lastMaterializedTo();
lastUploadedTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
LOG.debug(
"snapshot for checkpoint {}, change range: {}..{}",
checkpointId,
lastUploadedFrom,
lastUploadedTo);
+
+ MaterializedState materializedStateCopy =
periodicMaterializer.getMaterializedState();
+
return toRunnableFuture(
stateChangelogWriter
.persist(lastUploadedFrom)
- .thenApply(this::buildSnapshotResult));
+ .thenApply(delta -> buildSnapshotResult(delta,
materializedStateCopy)));
}
- private SnapshotResult<KeyedStateHandle>
buildSnapshotResult(ChangelogStateHandle delta) {
- // Can be called by either task thread during the sync checkpoint
phase (if persist future
- // was already completed); or by the writer thread otherwise. So need
to synchronize.
- // todo: revisit after FLINK-21357 - use mailbox action?
- synchronized (materialized) {
- // collections don't change once started and handles are immutable
- List<ChangelogStateHandle> prevDeltaCopy = new
ArrayList<>(restoredNonMaterialized);
- if (delta != null && delta.getStateSize() > 0) {
- prevDeltaCopy.add(delta);
- }
- if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
- return SnapshotResult.empty();
- } else {
- return SnapshotResult.of(
- new ChangelogStateBackendHandleImpl(
- materialized, prevDeltaCopy,
getKeyGroupRange()));
- }
+ @Override
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ periodicMaterializer.triggerMaterialization();
+ }
Review comment:
I tried to disable materialization by commenting it in
`ChangelogKeyedStateBackend.triggerMaterialization` and the test still passed,
so I think just triggering is not enough.
This suggests, that if we want to test it on this level, a more fine-grained
control over materialization is needed; which can be gained by a direct access
to the materializer in test.
And if we have such access, exposing something through the backend isn't
necessary.
But probably it makes sense to first resolve the questions about
backend-materializer interaction (like
[this](https://github.com/apache/flink/pull/16606#discussion_r682621905) one).
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]