curcur commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r699097517
########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializer.java ########## @@ -0,0 +1,260 @@ +package org.apache.flink.state.changelog; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final String subtaskName; + + private final long periodicMaterializeDelay; + + /** Allowed number of consecutive materialization failures. */ + private final int allowedNumberOfFailures; + + /** Number of consecutive materialization failures. */ + private final AtomicInteger numberOfConsecutiveFailures; + + private long materializedId; + + PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + CheckpointStorageWorkerView checkpointStorageWorkerView, + String subtaskName, + AsyncExceptionHandler asyncExceptionHandler, + long periodicMaterializeDelay, + int allowedNumberOfFailures) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + + this.subtaskName = subtaskName; + this.periodicMaterializeDelay = periodicMaterializeDelay; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + "periodic-materialization-scheduler-" + subtaskName)); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.numberOfConsecutiveFailures = new AtomicInteger(0); + + this.materializedId = 0; + + scheduleNextMaterialization(); + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // synchronize phase + SequenceNumber upTo = + stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(changelogStateBackendState.lastMaterializedTo())) { + + scheduleNextMaterialization(); + + LOG.info( + "Task {} has no state updates since last materialization, " + + "skip this one and schedule the next one in {} seconds", + subtaskName, + periodicMaterializeDelay / 1000); + + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + // This ID is not needed for materialization; + // But since we are re-using the streamFactory + // that is designed for state backend snapshot, + // which requires unique checkpoint ID. + // A faked materialized Id is provided here. + // TODO: implement its own streamFactory. + materializedId++, + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); Review comment: logically unclear if moving this part (sycn part) to changelog statebakcned. In that case, schedule happens both in materializer and changelogstatebackend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org