rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716640216
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
return is;
}
- private void completeRestore(Collection<ChangelogStateBackendHandle>
stateHandles) {
- if (!stateHandles.isEmpty()) {
- synchronized (materialized) { // ensure visibility
- for (ChangelogStateBackendHandle h : stateHandles) {
- if (h != null) {
- materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
- }
- }
+ private ChangelogSnapshotState completeRestore(
+ Collection<ChangelogStateBackendHandle> stateHandles) {
+
+ List<KeyedStateHandle> materialized = new ArrayList<>();
+ List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
+
+ for (ChangelogStateBackendHandle h : stateHandles) {
+ if (h != null) {
+ materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
}
}
+
changelogStates.clear();
+ return new ChangelogSnapshotState(
+ materialized,
+ restoredNonMaterialized,
+ stateChangelogWriter.initialSequenceNumber());
+ }
+
+ /**
+ * Initialize state materialization so that materialized data can be
persisted durably and
+ * included into the checkpoint.
+ *
+ * <p>This method is not thread safe. It should be called either under a
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+ public Optional<MaterializationRunnable> initMaterialization() throws
Exception {
+ SequenceNumber upTo =
stateChangelogWriter.lastAppendedSequenceNumber();
+
+ if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) {
+ return Optional.of(
+ new MaterializationRunnable(
+ keyedStateBackend.snapshot(
+ // This ID is not needed for
materialization;
+ // But since we are re-using the
streamFactory
+ // that is designed for state backend
snapshot,
+ // which requires unique checkpoint ID.
+ // A faked materialized Id is provided
here.
+ // TODO: implement its own streamFactory.
+ materializedId++,
+ System.currentTimeMillis(),
+ streamFactory,
+ CHECKPOINT_OPTIONS),
+ // TODO: add metadata to log FLINK-23170
+ upTo));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * This method is not thread safe. It should be called either under a lock
or through task
+ * mailbox executor.
+ */
+ public void updateChangelogSnapshotState(
+ SnapshotResult<KeyedStateHandle> materializedSnapshot,
SequenceNumber upTo) {
+ changelogSnapshotState =
+ new ChangelogSnapshotState(
+ getMaterializedResult(materializedSnapshot), new
ArrayList<>(), upTo);
Review comment:
I mean a method like this in `ChangelogSnapshotState` class:
```
public static ChangelogSnapshotState materialized(
SnapshotResult<KeyedStateHandle> snapshot, SequenceNumber
upTo) {
return new ChangelogSnapshotState(
getMaterializedResult(snapshot),
Collections.emptyList(), upTo);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]