curcur commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r699097517
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializer.java
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final String subtaskName;
+
+ private final long periodicMaterializeDelay;
+
+ /** Allowed number of consecutive materialization failures. */
+ private final int allowedNumberOfFailures;
+
+ /** Number of consecutive materialization failures. */
+ private final AtomicInteger numberOfConsecutiveFailures;
+
+ private long materializedId;
+
+ PeriodicMaterializer(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ CheckpointStorageWorkerView checkpointStorageWorkerView,
+ String subtaskName,
+ AsyncExceptionHandler asyncExceptionHandler,
+ long periodicMaterializeDelay,
+ int allowedNumberOfFailures) {
+ this.mailboxExecutor = mailboxExecutor;
+ this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+ this.subtaskName = subtaskName;
+ this.periodicMaterializeDelay = periodicMaterializeDelay;
+ this.asyncExceptionHandler = asyncExceptionHandler;
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ "periodic-materialization-scheduler-" +
subtaskName));
+ this.streamFactory = shared ->
checkpointStorageWorkerView.createTaskOwnedStateStream();
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+ this.materializedId = 0;
+
+ scheduleNextMaterialization();
+ }
+
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ // synchronize phase
+ SequenceNumber upTo =
+
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+ if
(upTo.equals(changelogStateBackendState.lastMaterializedTo())) {
+
+ scheduleNextMaterialization();
+
+ LOG.info(
+ "Task {} has no state updates since last
materialization, "
+ + "skip this one and schedule the next
one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ return;
+ }
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture =
+ keyedStateBackend.snapshot(
+ // This ID is not needed for
materialization;
+ // But since we are re-using the
streamFactory
+ // that is designed for state backend
snapshot,
+ // which requires unique checkpoint ID.
+ // A faked materialized Id is provided
here.
+ // TODO: implement its own streamFactory.
+ materializedId++,
+ System.currentTimeMillis(),
+ streamFactory,
+ CHECKPOINT_OPTIONS);
Review comment:
logically unclear if moving this part (sycn part) to changelog
statebakcned.
In that case, schedule happens both in materializer and
changelogstatebackend.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]