curcur commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r734220286
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
return is;
}
- private void completeRestore(Collection<ChangelogStateBackendHandle>
stateHandles) {
- if (!stateHandles.isEmpty()) {
- synchronized (materialized) { // ensure visibility
- for (ChangelogStateBackendHandle h : stateHandles) {
- if (h != null) {
- materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
- }
- }
+ public void registerCloseable(@Nullable Closeable closeable) {
+ closer.register(closeable);
+ }
+
+ private ChangelogSnapshotState completeRestore(
+ Collection<ChangelogStateBackendHandle> stateHandles) {
+
+ List<KeyedStateHandle> materialized = new ArrayList<>();
+ List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
+
+ for (ChangelogStateBackendHandle h : stateHandles) {
+ if (h != null) {
+ materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
}
}
+
changelogStates.clear();
+ return new ChangelogSnapshotState(
+ materialized,
+ restoredNonMaterialized,
+ stateChangelogWriter.initialSequenceNumber());
+ }
+
+ /**
+ * Initialize state materialization so that materialized data can be
persisted durably and
+ * included into the checkpoint.
+ *
+ * <p>This method is not thread safe. It should be called either under a
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+ public Optional<MaterializationRunnable> initMaterialization() throws
Exception {
+ SequenceNumber upTo = getLastAppendedTo();
+ SequenceNumber lastMaterializedTo =
changelogSnapshotState.lastMaterializedTo();
+
+ LOG.info(
+ "Initialize Materialization. Current changelog writers last
append to sequence number {}",
+ upTo);
+
+ if (upTo.compareTo(lastMaterializedTo) > 0) {
+
+ LOG.info("Starting materialization from {} : {}",
lastMaterializedTo, upTo);
+
+ return Optional.of(
+ new MaterializationRunnable(
+ keyedStateBackend.snapshot(
+ // This ID is not needed for
materialization;
+ // But since we are re-using the
streamFactory
+ // that is designed for state backend
snapshot,
+ // which requires unique checkpoint ID.
+ // A faked materialized Id is provided
here.
+ // TODO: implement its own streamFactory.
+ materializedId++,
+ System.currentTimeMillis(),
Review comment:
you mean `System.currentTimeMillis()` be replaced with materializedId++?
It's confusing and unnecessary. We materialize every 10 minutes... That's at
most one call per 10 minutes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]