curcur commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r699097517



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializer.java
##########
@@ -0,0 +1,260 @@
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+public class PeriodicMaterializer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+    /**
+     * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+     * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+     */
+    private static final CheckpointOptions CHECKPOINT_OPTIONS =
+            new CheckpointOptions(
+                    CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+    /** task mailbox executor, execute from Task Thread. */
+    private final MailboxExecutor mailboxExecutor;
+
+    /** Async thread pool, to complete async phase of materialization. */
+    private final ExecutorService asyncOperationsThreadPool;
+
+    /** scheduled executor, periodically trigger materialization. */
+    private final ScheduledExecutorService periodicExecutor;
+
+    private final CheckpointStreamFactory streamFactory;
+
+    private final AsyncExceptionHandler asyncExceptionHandler;
+
+    private final String subtaskName;
+
+    private final long periodicMaterializeDelay;
+
+    /** Allowed number of consecutive materialization failures. */
+    private final int allowedNumberOfFailures;
+
+    /** Number of consecutive materialization failures. */
+    private final AtomicInteger numberOfConsecutiveFailures;
+
+    private long materializedId;
+
+    PeriodicMaterializer(
+            MailboxExecutor mailboxExecutor,
+            ExecutorService asyncOperationsThreadPool,
+            CheckpointStorageWorkerView checkpointStorageWorkerView,
+            String subtaskName,
+            AsyncExceptionHandler asyncExceptionHandler,
+            long periodicMaterializeDelay,
+            int allowedNumberOfFailures) {
+        this.mailboxExecutor = mailboxExecutor;
+        this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+        this.subtaskName = subtaskName;
+        this.periodicMaterializeDelay = periodicMaterializeDelay;
+        this.asyncExceptionHandler = asyncExceptionHandler;
+        this.periodicExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory(
+                                "periodic-materialization-scheduler-" + 
subtaskName));
+        this.streamFactory = shared -> 
checkpointStorageWorkerView.createTaskOwnedStateStream();
+        this.allowedNumberOfFailures = allowedNumberOfFailures;
+        this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+        this.materializedId = 0;
+
+        scheduleNextMaterialization();
+    }
+
+    @VisibleForTesting
+    public void triggerMaterialization() {
+        mailboxExecutor.execute(
+                () -> {
+                    // synchronize phase
+                    SequenceNumber upTo =
+                            
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+                    if 
(upTo.equals(changelogStateBackendState.lastMaterializedTo())) {
+
+                        scheduleNextMaterialization();
+
+                        LOG.info(
+                                "Task {} has no state updates since last 
materialization, "
+                                        + "skip this one and schedule the next 
one in {} seconds",
+                                subtaskName,
+                                periodicMaterializeDelay / 1000);
+
+                        return;
+                    }
+
+                    RunnableFuture<SnapshotResult<KeyedStateHandle>> 
materializedRunnableFuture =
+                            keyedStateBackend.snapshot(
+                                    // This ID is not needed for 
materialization;
+                                    // But since we are re-using the 
streamFactory
+                                    // that is designed for state backend 
snapshot,
+                                    // which requires unique checkpoint ID.
+                                    // A faked materialized Id is provided 
here.
+                                    // TODO: implement its own streamFactory.
+                                    materializedId++,
+                                    System.currentTimeMillis(),
+                                    streamFactory,
+                                    CHECKPOINT_OPTIONS);

Review comment:
       logically unclear if moving this part (sycn part) to changelog 
statebakcned.
   In that case, schedule happens both in materializer and 
changelogstatebackend.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to