rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682624049
########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; + +import java.util.List; + +/** Materialization State, only accessed by task thread. */ +public class MaterializedState { + /** Set initially on restore and later upon materialization. */ + private final List<KeyedStateHandle> materializedSnapshot; + + /** Updated initially on restore and later cleared upon materialization. */ + private final List<ChangelogStateHandle> restoredNonMaterialized; Review comment: I think NON-materialized state should not be placed inside the MaterializedState. I guess the reason for this is the need to update the encapsulated values atomically. I proposed one way to solve this here. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); Review comment: I like that SQN and sync-snapshot are obtained close to each other, but I think the proper place for this is `ChangelogKeyedStateBackend`. That way, it would be responsible for its own consistency; materializer, OTH, would be responsible purely for running the (async part of) materialization and return the results. WDYT about the following protocol: - materializer periodically requests [SQN, syncSnapshot] from the backend (`changelogBackend.materializedAndSnapshot()` returning `Tuple2<SequenceNumber, RunnableFuture<SnapshotResult<KeyedStateHandle>>>` or some class) - runs async phase - on completion it submits the results back (`changelogBackend.handleMaterializationResults(state)`) This will also solve the problem of updating atomically of `materialized` and `restoredNonMaterialized`` (this comment). (`MailboxExecutor` should be used for both calls, either inside or outside) ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { Review comment: 1. If the existing classes are used (as commented here); then this class doesn't need it's own package; and then it doesn't have to be public. Otherwise, annotate with `@Internal`? 2. How about renaming to something like `ChangelogMaterializer`? It doesn't have to be periodic, the essential part I think is changelog (I didn't pay much attention to names in the prototype to be honest :slightly_smiling_face: ) ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; Review comment: Rename to `numberOfFailures`/`numberOfConsecutiveFailures`? ########## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ########## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; + // ---------- statebackend related configurations ------------------------------ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; + /** Interval in milliseconds to perform periodic materialization. */ + private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); Review comment: nit: can we add `Millis` or `Ms` to the variable name (as in `taskCancellationIntervalMillis`)? ditto other added fields ########## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ########## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; + // ---------- statebackend related configurations ------------------------------ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; + /** Interval in milliseconds to perform periodic materialization. */ + private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + + /** Interval in milliseconds for initial delay of periodic materialization. */ + private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + + /** Max allowed number of failures */ + private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); Review comment: nit: the comment says the same as the variable name so it can be removed ditto other added fields ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; + +import java.util.List; + +/** Materialization State, only accessed by task thread. */ +public class MaterializedState { + /** Set initially on restore and later upon materialization. */ + private final List<KeyedStateHandle> materializedSnapshot; + + /** Updated initially on restore and later cleared upon materialization. */ + private final List<ChangelogStateHandle> restoredNonMaterialized; + + /** + * The {@link SequenceNumber} up to which the state is materialized, exclusive. The log should + * be truncated accordingly. + */ + private final SequenceNumber materializedTo; + + public MaterializedState( + List<KeyedStateHandle> materializedSnapshot, + List<ChangelogStateHandle> restoredNonMaterialized, + SequenceNumber materializedTo) { + this.materializedSnapshot = materializedSnapshot; + this.restoredNonMaterialized = restoredNonMaterialized; + this.materializedTo = materializedTo; Review comment: `notNull` check and wrap with `unmodifiable`? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, Review comment: Could you add the results of the last discussion as a comment here? (i.e. some backends might need use `id`; however it doesn't make sense for materialization; we should probably add a new method or store the id) ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); + mailboxExecutor.execute( + () -> { + materializedState = + new MaterializedState( + getMaterializedResult(materializedRunnableFuture.get()), Review comment: I think it's safer to call `future.get` from the async thread (even if it's a no-op currently). ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } Review comment: I think we should reset `materializationOnGoing` flag here. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } Review comment: I think with this approach the next materialization can be delayed if the current one is running late. I'd expect the next one to run immediately after the current one in this case. Alternatively, we can configure the pause instead of the interval (like we do for checkpoints). One way to achieve this is to schedule triggering upon materialization completion. I don't think this is very critical, unless we want to change interval to pause. In that case, configuration will have to be changed. WDYT? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); Review comment: Can we close the safety net right after this statement (`runIfNotDoneAndGet`)? I think it would make the flow easier to follow and possible exceptions won't affect each other. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); + mailboxExecutor.execute( + () -> { + materializedState = + new MaterializedState( + getMaterializedResult(materializedRunnableFuture.get()), + new ArrayList<>(), + upTo); + + checkState( + materializationOnGoing.compareAndSet(true, false), + "expect to finish materialization successfully, " + + "flag materializationOnGoing should be true before finishing."); + retries.set(0); + }, + "update materializedSnapshot up to changelog sequence number: {}", + upTo); + } catch (Exception e) { + int retryTime = retries.incrementAndGet(); + + LOG.info( + "Asynchronous part of materialization could not be completed for the {} time.", + retryTime, + e); + + // ToDO: Double check + // how the async phase is related to materialized state tracking + // refer to AsyncCheckpointRunnable#run#catch + // cleanup logic in StateUtil#discardStateFuture Review comment: This should be fine (for tracking we need a Tracker anyway, and I think it should not affect this part of code). ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); Review comment: nit: add `-scheduler` to name? and ideally task info (`env.getTaskInfo().getTaskNameWithSubtasks()`) ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; Review comment: nit: rename to `snapshotable`? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); + mailboxExecutor.execute( + () -> { + materializedState = + new MaterializedState( + getMaterializedResult(materializedRunnableFuture.get()), + new ArrayList<>(), + upTo); + + checkState( + materializationOnGoing.compareAndSet(true, false), Review comment: I think logging of successful materialization would be helpful here (`DEBUG` level?). ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); + mailboxExecutor.execute( + () -> { + materializedState = + new MaterializedState( + getMaterializedResult(materializedRunnableFuture.get()), + new ArrayList<>(), + upTo); + + checkState( + materializationOnGoing.compareAndSet(true, false), + "expect to finish materialization successfully, " + + "flag materializationOnGoing should be true before finishing."); + retries.set(0); + }, + "update materializedSnapshot up to changelog sequence number: {}", + upTo); + } catch (Exception e) { + int retryTime = retries.incrementAndGet(); + + LOG.info( + "Asynchronous part of materialization could not be completed for the {} time.", + retryTime, + e); + + // ToDO: Double check + // how the async phase is related to materialized state tracking + // refer to AsyncCheckpointRunnable#run#catch + // cleanup logic in StateUtil#discardStateFuture + handleExecutionException(materializedRunnableFuture); + + if (retryTime == allowedNumberOfFailures) { + asyncExceptionHandler.handleAsyncException( + "Fail to complete the asynchronous part of materialization", e); + } Review comment: 1. I think we should either reset failure counter here or check it before each materialization.. Otherwise, we depend on asyncExceptionHandler for consistent behaviour. 2. Replace `==` with `>=`? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( Review comment: Should we handle any possible exceptions here to honour retry limit? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ########## @@ -329,37 +332,47 @@ public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) // materialization may truncate only a part of the previous result and the backend would // have to split it somehow for the former option, so the latter is used. lastCheckpointId = checkpointId; - lastUploadedFrom = materializedTo; + lastUploadedFrom = periodicMaterializer.getMaterializedState().lastMaterializedTo(); lastUploadedTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); LOG.debug( "snapshot for checkpoint {}, change range: {}..{}", checkpointId, lastUploadedFrom, lastUploadedTo); + + MaterializedState materializedStateCopy = periodicMaterializer.getMaterializedState(); + return toRunnableFuture( stateChangelogWriter .persist(lastUploadedFrom) - .thenApply(this::buildSnapshotResult)); + .thenApply(delta -> buildSnapshotResult(delta, materializedStateCopy))); } - private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle delta) { - // Can be called by either task thread during the sync checkpoint phase (if persist future - // was already completed); or by the writer thread otherwise. So need to synchronize. - // todo: revisit after FLINK-21357 - use mailbox action? - synchronized (materialized) { - // collections don't change once started and handles are immutable - List<ChangelogStateHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); - if (delta != null && delta.getStateSize() > 0) { - prevDeltaCopy.add(delta); - } - if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) { - return SnapshotResult.empty(); - } else { - return SnapshotResult.of( - new ChangelogStateBackendHandleImpl( - materialized, prevDeltaCopy, getKeyGroupRange())); - } + @Override + @VisibleForTesting + public void triggerMaterialization() { + periodicMaterializer.triggerMaterialization(); + } Review comment: This is very..practical :slightly_smiling_face: I think a similar effect can be achieved by using some custom `ScheduledExecutorService` in Materializer (e.g. `ManuallyTriggeredScheduledExecutorService`) ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; Review comment: I think this can be boolean (non-atomic) - as it's only accessed only a single (task) thread. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ########## @@ -316,6 +319,11 @@ public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() { return operatorEventGateway; } + @Override + public ThroughputCalculator getThroughputMeter() { + return throughputCalculator; + } + Review comment: nit: unrelated change ########## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ########## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; + // ---------- statebackend related configurations ------------------------------ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; + /** Interval in milliseconds to perform periodic materialization. */ + private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + + /** Interval in milliseconds for initial delay of periodic materialization. */ + private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); Review comment: Why `periodicMaterializeInterval` isn't sufficient? ########## File path: flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java ########## @@ -88,4 +88,35 @@ .defaultValue(128) .withDescription( "Defines the number of measured latencies to maintain at each state access operation."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_MATERIALIZATION) + public static final ConfigOption<Long> PERIODIC_MATERIALIZATION_INTERVAL = + ConfigOptions.key("state.backend.periodic-materialize.interval") + .longType() + .defaultValue(10000L) + .withDescription( + "Defines the interval in milliseconds to perform periodic materialization for state backend."); Review comment: I think these options should be placed together with other `state.backend.changelog*` options (`CheckpointingOptions`) and use the same `COMMON_STATE_BACKENDS` section. WDYT? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/AsyncExceptionHandler.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.runtime.execution.Environment; + +/** Handling exceptions thrown by non-task thread. */ +public class AsyncExceptionHandler { Review comment: This class looks like a copy of `StreamTaskAsyncExceptionHandler` - why didn't you use the existing one (plus interface)? ditto `AsynchronousException` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ########## @@ -345,7 +353,12 @@ public ExecutorService getAsyncOperationsThreadPool() { } @Override - public ThroughputCalculator getThroughputMeter() { - return throughputCalculator; + public void setCheckpointStorage(CheckpointStorage checkpointStorage) { + this.checkpointStorage = checkpointStorage; + } Review comment: Could we add a check here to prevent resetting the storage? And also to other recently added setters? ########## File path: flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java ########## @@ -88,4 +88,35 @@ .defaultValue(128) .withDescription( "Defines the number of measured latencies to maintain at each state access operation."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_MATERIALIZATION) + public static final ConfigOption<Long> PERIODIC_MATERIALIZATION_INTERVAL = + ConfigOptions.key("state.backend.periodic-materialize.interval") + .longType() + .defaultValue(10000L) + .withDescription( + "Defines the interval in milliseconds to perform periodic materialization for state backend."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_MATERIALIZATION) + public static final ConfigOption<Long> PERIODIC_MATERIALIZATION_INIT_DELAY = + ConfigOptions.key("state.backend.periodic-materialize.initial-delay") + .longType() + .defaultValue(10000L) Review comment: I think this default is too small if we are targeting 500-1000ms checkpoint duration. "Continuous materialization" is also an option but maybe we should start with the default of `execution.checkpointing.timeout` - 10min? ########## File path: flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java ########## @@ -88,4 +88,35 @@ .defaultValue(128) .withDescription( "Defines the number of measured latencies to maintain at each state access operation."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_MATERIALIZATION) + public static final ConfigOption<Long> PERIODIC_MATERIALIZATION_INTERVAL = + ConfigOptions.key("state.backend.periodic-materialize.interval") + .longType() + .defaultValue(10000L) Review comment: What about `durationType`? ditto other added fields ########## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ########## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; + // ---------- statebackend related configurations ------------------------------ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; + /** Interval in milliseconds to perform periodic materialization. */ + private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + + /** Interval in milliseconds for initial delay of periodic materialization. */ + private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + + /** Max allowed number of failures */ + private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); + + /** Flag to enable periodic materialization */ + private boolean isPeriodicMaterializationEnabled = false; Review comment: What are the use caseswhen this flag is false? ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java ########## @@ -222,8 +223,11 @@ public StateBackend configure(ReadableConfig config, ClassLoader classLoader) ttlTimeProvider, changelogStorage.createWriter(operatorIdentifier, keyGroupRange), baseState, + new AsyncExceptionHandler(env), env.getMainMailboxExecutor(), - env.getAsyncOperationsThreadPool())); + env.getAsyncOperationsThreadPool(), + env.getCheckpointStorage() + .createCheckpointStorage(env.getJobID()))); Review comment: I think it's better to pass `CheckpointStorageAccess` created in `StreamTask` as an argument, because: - The returned object is supposed to be used by multiple backends so it doesn't seem like a backend responsibility to create it - the call can also potentially create a new storage. - if it has to be closed then it will be unclear who is doing it (existing backends using it are deprecated) (at least add a todo) ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java ########## @@ -610,6 +612,10 @@ public void testSharedIncrementalStateDeRegistration() throws Exception { } } + @Override + @Test + public void testMaterializedRestore() {} + Review comment: This override prevents the test from running at all, even for changelog backend tests. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { + private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + + /** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ + private static final CheckpointOptions CHECKPOINT_OPTIONS = + new CheckpointOptions( + CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + + /** task mailbox executor, execute from Task Thread. */ + private final MailboxExecutor mailboxExecutor; + + /** Async thread pool, to complete async phase of materialization. */ + private final ExecutorService asyncOperationsThreadPool; + + /** scheduled executor, periodically trigger materialization. */ + private final ScheduledExecutorService periodicExecutor; + + private final CheckpointStreamFactory streamFactory; + + private final Snapshotable keyedStateBackend; + + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; + + private final AsyncExceptionHandler asyncExceptionHandler; + + private final int allowedNumberOfFailures; + + /** Materialization failure retries. */ + private final AtomicInteger retries; + + /** Making sure only one materialization on going at a time. */ + private final AtomicBoolean materializationOnGoing; + + private long materializedId; + + private MaterializedState materializedState; + + public PeriodicMaterializer( + MailboxExecutor mailboxExecutor, + ExecutorService asyncOperationsThreadPool, + Snapshotable keyedStateBackend, + CheckpointStorageWorkerView checkpointStorageWorkerView, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, + AsyncExceptionHandler asyncExceptionHandler, + MaterializedState materializedState, + long periodicMaterializeInitDelay, + long periodicMaterializeInterval, + int allowedNumberOfFailures, + boolean materializationEnabled) { + this.mailboxExecutor = mailboxExecutor; + this.asyncOperationsThreadPool = asyncOperationsThreadPool; + this.keyedStateBackend = keyedStateBackend; + this.stateChangelogWriter = stateChangelogWriter; + this.asyncExceptionHandler = asyncExceptionHandler; + this.periodicExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("periodic-materialization")); + this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.allowedNumberOfFailures = allowedNumberOfFailures; + this.materializationOnGoing = new AtomicBoolean(false); + this.retries = new AtomicInteger(allowedNumberOfFailures); + + this.materializedId = 0; + this.materializedState = materializedState; + + if (materializationEnabled) { + this.periodicExecutor.scheduleAtFixedRate( + this::triggerMaterialization, + periodicMaterializeInitDelay, + periodicMaterializeInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + public void triggerMaterialization() { + mailboxExecutor.execute( + () -> { + // Only one materialization ongoing at a time + if (!materializationOnGoing.compareAndSet(false, true)) { + return; + } + + SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); + + if (upTo.equals(materializedState.lastMaterializedTo())) { + return; + } + + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture = + keyedStateBackend.snapshot( + materializedId++, // TO DISCUSS: seems this does not matter, + // based on + System.currentTimeMillis(), + streamFactory, + CHECKPOINT_OPTIONS); + + // TODO: add metadata to log FLINK-23170 + asyncOperationsThreadPool.execute( + () -> asyncMaterialization(materializedRunnableFuture, upTo)); + }, + "materialization"); + } + + private void asyncMaterialization( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, + SequenceNumber upTo) { + + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture); + mailboxExecutor.execute( + () -> { + materializedState = + new MaterializedState( + getMaterializedResult(materializedRunnableFuture.get()), + new ArrayList<>(), + upTo); + + checkState( + materializationOnGoing.compareAndSet(true, false), + "expect to finish materialization successfully, " + + "flag materializationOnGoing should be true before finishing."); + retries.set(0); + }, + "update materializedSnapshot up to changelog sequence number: {}", + upTo); + } catch (Exception e) { + int retryTime = retries.incrementAndGet(); + + LOG.info( + "Asynchronous part of materialization could not be completed for the {} time.", + retryTime, + e); + + // ToDO: Double check + // how the async phase is related to materialized state tracking + // refer to AsyncCheckpointRunnable#run#catch + // cleanup logic in StateUtil#discardStateFuture + handleExecutionException(materializedRunnableFuture); + + if (retryTime == allowedNumberOfFailures) { + asyncExceptionHandler.handleAsyncException( + "Fail to complete the asynchronous part of materialization", e); + } + + // To simplify threading model, materializationOnGoing is only reset/updated in task + // thread + mailboxExecutor.execute( + () -> + checkState( + materializationOnGoing.compareAndSet(true, false), + "expect abort materialization in asynchronous phase, " + + "flag materializationOnGoing should be true before aborting."), + "abort asynchronous part of materialization for the {} time.", + retryTime); + } finally { + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); + } + } + + private void handleExecutionException( + RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture) { + + LOG.info("Cleanup asynchronous runnable for materialization."); + + if (materializedRunnableFuture != null) { + // materialization has started + if (!materializedRunnableFuture.cancel(true)) { + try { + StateObject stateObject = materializedRunnableFuture.get(); + if (stateObject != null) { + stateObject.discardState(); + } + } catch (Exception ex) { + LOG.debug( + "Cancelled execution of snapshot future runnable. " + + "Cancellation produced the following " + + "exception, which is expected and can be ignored.", + ex); + } + } + } + } + + // TODO: this method may change after the ownership PR + public static List<KeyedStateHandle> getMaterializedResult( Review comment: This can be private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
