rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716867123
##########
File path: pom.xml
##########
@@ -1518,7 +1518,7 @@ under the License.
random: enable it
randomly, unless explicitly set
unset: don't alter the
configuration
-->
-
<checkpointing.changelog>random</checkpointing.changelog>
+
<checkpointing.changelog>on</checkpointing.changelog>
Review comment:
Is this change temporary and should be dropped before merging?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final String subtaskName;
+
+ private final long periodicMaterializeDelay;
+
+ /** Allowed number of consecutive materialization failures. */
+ private final int allowedNumberOfFailures;
+
+ /** Number of consecutive materialization failures. */
+ private final AtomicInteger numberOfConsecutiveFailures;
+
+ private final ChangelogKeyedStateBackend<?> keyedStateBackend;
+
+ private boolean started = false;
+
+ PeriodicMaterializationManager(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ String subtaskName,
+ AsyncExceptionHandler asyncExceptionHandler,
+ ChangelogKeyedStateBackend<?> keyedStateBackend,
+ long periodicMaterializeDelay,
+ int allowedNumberOfFailures) {
+ this.mailboxExecutor = checkNotNull(mailboxExecutor);
+ this.asyncOperationsThreadPool =
checkNotNull(asyncOperationsThreadPool);
+ this.subtaskName = checkNotNull(subtaskName);
+ this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+ this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+ this.periodicMaterializeDelay = periodicMaterializeDelay;
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ "periodic-materialization-scheduler-" +
subtaskName));
+ }
+
+ public void start() {
+ if (!started) {
+
+ started = true;
+
+ LOG.info(
+ "Task {} starts periodic materialization, scheduling the
next one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+ }
+
+ private void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ Optional<MaterializationRunnable>
materializationRunnableOptional =
+ keyedStateBackend.initMaterialization();
+
+ if (materializationRunnableOptional.isPresent()) {
+ MaterializationRunnable runnable =
materializationRunnableOptional.get();
+ asyncOperationsThreadPool.execute(
+ () ->
+ asyncMaterializationPhase(
+
runnable.getMaterializationRunnable(),
+ runnable.getMaterializedTo()));
+ } else {
+ scheduleNextMaterialization();
+
+ LOG.info(
+ "Task {} has no state updates since last
materialization, "
+ + "skip this one and schedule the next
one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+ }
+ },
+ "materialization");
+ }
+
+ private void asyncMaterializationPhase(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ SnapshotResult<KeyedStateHandle> materializedSnapshot =
+ uploadSnapshot(materializedRunnableFuture);
+
+ // if succeed, update state and finish up
+ if (materializedSnapshot != null) {
+
+ numberOfConsecutiveFailures.set(0);
+
+ final SnapshotResult<KeyedStateHandle> copyMaterializedSnapshot =
materializedSnapshot;
+
+ mailboxExecutor.execute(
+ () ->
+ keyedStateBackend.updateChangelogSnapshotState(
+ copyMaterializedSnapshot,
+ upTo),
+ "Task {} update materializedSnapshot up to changelog
sequence number: {}",
+ subtaskName,
+ upTo);
+ }
+
+ LOG.info(
+ "Task {} schedules the next materialization in {} seconds.",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+
+ private SnapshotResult<KeyedStateHandle> uploadSnapshot(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
+
+ LOG.debug("Task {} finishes asynchronous part of
materialization.", subtaskName);
+
+ return materializedRunnableFuture.get();
+
+ } catch (Exception e) {
+ int retryTime = numberOfConsecutiveFailures.incrementAndGet();
+
+ LOG.info(
+ "Task {} asynchronous part of materialization is not
completed for the {} time.",
+ subtaskName,
+ retryTime,
+ e);
+
+ discardFailedUploads(materializedRunnableFuture);
+
+ if (retryTime >= allowedNumberOfFailures) {
+ // Fail the task externally, this causes task failover
+ asyncExceptionHandler.handleAsyncException(
+ "Task "
+ + subtaskName
+ + " fails to complete the asynchronous part of
materialization",
+ e);
+ }
+ } finally {
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ }
+
+ return null;
+ }
+
+ private void discardFailedUploads(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ LOG.info("Task {} cleanup asynchronous runnable for materialization.",
subtaskName);
+
+ if (materializedRunnableFuture != null) {
+ // materialization has started
+ if (!materializedRunnableFuture.cancel(true)) {
+ try {
+ StateObject stateObject = materializedRunnableFuture.get();
+ if (stateObject != null) {
+ stateObject.discardState();
+ }
+ } catch (Exception ex) {
+ LOG.debug(
+ "Task "
+ + subtaskName
+ + " cancelled execution of snapshot future
runnable. "
+ + "Cancellation produced the following "
+ + "exception, which is expected and can be
ignored.",
+ ex);
+ }
+ }
+ }
+ }
+
+ // Only be called in the task thread to simplify the threading model
+ private void scheduleNextMaterialization() {
+ periodicExecutor.schedule(
+ this::triggerMaterialization, periodicMaterializeDelay,
TimeUnit.MILLISECONDS);
+ }
Review comment:
Should we prevent scheduling if the executor is/being shut down?
Otherwise, I'm afraid we can get rejected execution exceptions.
I think it can be solved by having some flag variable or state with some
synchronization with `close()`.
WDYT?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final String subtaskName;
+
+ private final long periodicMaterializeDelay;
+
+ /** Allowed number of consecutive materialization failures. */
+ private final int allowedNumberOfFailures;
+
+ /** Number of consecutive materialization failures. */
+ private final AtomicInteger numberOfConsecutiveFailures;
+
+ private final ChangelogKeyedStateBackend<?> keyedStateBackend;
+
+ private boolean started = false;
+
+ PeriodicMaterializationManager(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ String subtaskName,
+ AsyncExceptionHandler asyncExceptionHandler,
+ ChangelogKeyedStateBackend<?> keyedStateBackend,
+ long periodicMaterializeDelay,
+ int allowedNumberOfFailures) {
+ this.mailboxExecutor = checkNotNull(mailboxExecutor);
+ this.asyncOperationsThreadPool =
checkNotNull(asyncOperationsThreadPool);
+ this.subtaskName = checkNotNull(subtaskName);
+ this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+ this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+ this.periodicMaterializeDelay = periodicMaterializeDelay;
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ "periodic-materialization-scheduler-" +
subtaskName));
+ }
+
+ public void start() {
+ if (!started) {
+
+ started = true;
+
+ LOG.info(
+ "Task {} starts periodic materialization, scheduling the
next one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+ }
+
+ private void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ Optional<MaterializationRunnable>
materializationRunnableOptional =
+ keyedStateBackend.initMaterialization();
+
+ if (materializationRunnableOptional.isPresent()) {
+ MaterializationRunnable runnable =
materializationRunnableOptional.get();
+ asyncOperationsThreadPool.execute(
+ () ->
+ asyncMaterializationPhase(
+
runnable.getMaterializationRunnable(),
+ runnable.getMaterializedTo()));
+ } else {
+ scheduleNextMaterialization();
+
+ LOG.info(
+ "Task {} has no state updates since last
materialization, "
+ + "skip this one and schedule the next
one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+ }
+ },
+ "materialization");
+ }
+
+ private void asyncMaterializationPhase(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ SnapshotResult<KeyedStateHandle> materializedSnapshot =
+ uploadSnapshot(materializedRunnableFuture);
+
+ // if succeed, update state and finish up
+ if (materializedSnapshot != null) {
+
+ numberOfConsecutiveFailures.set(0);
+
+ final SnapshotResult<KeyedStateHandle> copyMaterializedSnapshot =
materializedSnapshot;
+
+ mailboxExecutor.execute(
+ () ->
+ keyedStateBackend.updateChangelogSnapshotState(
+ copyMaterializedSnapshot,
+ upTo),
+ "Task {} update materializedSnapshot up to changelog
sequence number: {}",
+ subtaskName,
+ upTo);
+ }
+
+ LOG.info(
+ "Task {} schedules the next materialization in {} seconds.",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
Review comment:
I think this method is currently called even if retry limit is exceeded.
This can be avoided if part of error handling logic is moved from
`uploadSnapshot` to `asyncMaterializationPhase`. So that `uploadSnapshot` only
deals with discarding failed upload and closing safety net. And
`asyncMaterializationPhase catches any errors, handles retries, and schedules
next if needed.
As a side effect we'd also get rid of signalling error with null result.
WDYT?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
##########
@@ -76,4 +120,145 @@ public static TestTaskStateManager
createTaskStateManager(File changelogStorageP
Path.fromLocalFile(changelogStoragePath),
false, 1024))
.build();
}
+
+ public static void testMaterializedRestore(
+ StateBackend stateBackend, Environment env,
CheckpointStreamFactory streamFactory)
+ throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+ TypeInformation<StateBackendTestBase.TestPojo> pojoType =
+ new GenericTypeInfo<>(StateBackendTestBase.TestPojo.class);
+ ValueStateDescriptor<StateBackendTestBase.TestPojo> kvId =
+ new ValueStateDescriptor<>("id", pojoType);
+
+ ChangelogKeyedStateBackend<Integer> keyedBackend =
+ (ChangelogKeyedStateBackend<Integer>)
createKeyedBackend(stateBackend, env);
+
+ CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
+ PeriodicMaterializationManager periodicMaterializationManager =
+ new PeriodicMaterializationManager(
+ checkNotNull(env.getMainMailboxExecutor()),
+ checkNotNull(env.getAsyncOperationsThreadPool()),
+ env.getTaskInfo().getTaskNameWithSubtasks(),
+ (message, exception) ->
asyncComplete.completeExceptionally(exception),
+ keyedBackend,
+ 10,
+ 1);
+
+ periodicMaterializationManager.start();
+
+ try {
+ ValueState<StateBackendTestBase.TestPojo> state =
+ keyedBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ keyedBackend.setCurrentKey(1);
+ state.update(new StateBackendTestBase.TestPojo("u1", 1));
+
+ keyedBackend.setCurrentKey(2);
+ state.update(new StateBackendTestBase.TestPojo("u2", 2));
+
+ Thread.sleep(100);
Review comment:
I'm assuming this part isn't finished yet.
Otherwise, this will make the test unstable.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ChangelogOptions.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+/** A collection of all configuration options that relate to changelog. */
+public class ChangelogOptions {
Review comment:
WDYT about marking the class as `@PublicEvolving` and adding "state" to
name, like `StateChangelogOptions`?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final String subtaskName;
+
+ private final long periodicMaterializeDelay;
+
+ /** Allowed number of consecutive materialization failures. */
+ private final int allowedNumberOfFailures;
+
+ /** Number of consecutive materialization failures. */
+ private final AtomicInteger numberOfConsecutiveFailures;
+
+ private final ChangelogKeyedStateBackend<?> keyedStateBackend;
+
+ private boolean started = false;
+
+ PeriodicMaterializationManager(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ String subtaskName,
+ AsyncExceptionHandler asyncExceptionHandler,
+ ChangelogKeyedStateBackend<?> keyedStateBackend,
+ long periodicMaterializeDelay,
+ int allowedNumberOfFailures) {
+ this.mailboxExecutor = checkNotNull(mailboxExecutor);
+ this.asyncOperationsThreadPool =
checkNotNull(asyncOperationsThreadPool);
+ this.subtaskName = checkNotNull(subtaskName);
+ this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+ this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+ this.periodicMaterializeDelay = periodicMaterializeDelay;
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ "periodic-materialization-scheduler-" +
subtaskName));
+ }
+
+ public void start() {
+ if (!started) {
+
+ started = true;
+
+ LOG.info(
+ "Task {} starts periodic materialization, scheduling the
next one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+ }
+
+ private void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ Optional<MaterializationRunnable>
materializationRunnableOptional =
+ keyedStateBackend.initMaterialization();
+
+ if (materializationRunnableOptional.isPresent()) {
+ MaterializationRunnable runnable =
materializationRunnableOptional.get();
+ asyncOperationsThreadPool.execute(
+ () ->
+ asyncMaterializationPhase(
+
runnable.getMaterializationRunnable(),
+ runnable.getMaterializedTo()));
+ } else {
+ scheduleNextMaterialization();
+
+ LOG.info(
+ "Task {} has no state updates since last
materialization, "
+ + "skip this one and schedule the next
one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+ }
+ },
+ "materialization");
+ }
+
+ private void asyncMaterializationPhase(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ SnapshotResult<KeyedStateHandle> materializedSnapshot =
+ uploadSnapshot(materializedRunnableFuture);
+
+ // if succeed, update state and finish up
+ if (materializedSnapshot != null) {
+
+ numberOfConsecutiveFailures.set(0);
+
+ final SnapshotResult<KeyedStateHandle> copyMaterializedSnapshot =
materializedSnapshot;
+
+ mailboxExecutor.execute(
+ () ->
+ keyedStateBackend.updateChangelogSnapshotState(
+ copyMaterializedSnapshot,
+ upTo),
+ "Task {} update materializedSnapshot up to changelog
sequence number: {}",
+ subtaskName,
+ upTo);
+ }
+
+ LOG.info(
+ "Task {} schedules the next materialization in {} seconds.",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+
+ private SnapshotResult<KeyedStateHandle> uploadSnapshot(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
+
+ LOG.debug("Task {} finishes asynchronous part of
materialization.", subtaskName);
+
+ return materializedRunnableFuture.get();
+
+ } catch (Exception e) {
+ int retryTime = numberOfConsecutiveFailures.incrementAndGet();
+
+ LOG.info(
+ "Task {} asynchronous part of materialization is not
completed for the {} time.",
+ subtaskName,
+ retryTime,
+ e);
+
+ discardFailedUploads(materializedRunnableFuture);
+
+ if (retryTime >= allowedNumberOfFailures) {
+ // Fail the task externally, this causes task failover
+ asyncExceptionHandler.handleAsyncException(
+ "Task "
+ + subtaskName
+ + " fails to complete the asynchronous part of
materialization",
+ e);
+ }
+ } finally {
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ }
+
+ return null;
+ }
+
+ private void discardFailedUploads(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ LOG.info("Task {} cleanup asynchronous runnable for materialization.",
subtaskName);
+
+ if (materializedRunnableFuture != null) {
+ // materialization has started
+ if (!materializedRunnableFuture.cancel(true)) {
+ try {
+ StateObject stateObject = materializedRunnableFuture.get();
+ if (stateObject != null) {
+ stateObject.discardState();
+ }
+ } catch (Exception ex) {
+ LOG.debug(
+ "Task "
+ + subtaskName
+ + " cancelled execution of snapshot future
runnable. "
+ + "Cancellation produced the following "
+ + "exception, which is expected and can be
ignored.",
+ ex);
+ }
+ }
+ }
+ }
+
+ // Only be called in the task thread to simplify the threading model
+ private void scheduleNextMaterialization() {
+ periodicExecutor.schedule(
+ this::triggerMaterialization, periodicMaterializeDelay,
TimeUnit.MILLISECONDS);
+ }
+
+ public void close() {
+ if (!periodicExecutor.isShutdown()) {
+ periodicExecutor.shutdownNow();
+ }
+ started = false;
Review comment:
Maybe we should not reset this flag, otherwise we also have to reset
other fields, like `numberOfConsecutiveFailures`.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ChangelogOptions.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+/** A collection of all configuration options that relate to changelog. */
+public class ChangelogOptions {
+
+ @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<Duration>
PERIODIC_MATERIALIZATION_INTERVAL =
+
ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "Defines the interval in milliseconds to perform "
+ + "periodic materialization for state
backend.");
+
+ @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<Integer>
MATERIALIZATION_MAX_FAILURES_ALLOWED =
+ ConfigOptions.key("state.backend.changelog.max.failures.allowed")
Review comment:
`state.backend.changelog.max-failures`
(I think dot is already used to separate sections here)
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/test/resources/log4j2-test.properties
##########
@@ -18,11 +18,11 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
Review comment:
The common approach is to disable logging by default and change level on
demand. Can't we use it here?
ditto: pattern
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##########
@@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService
executorService) {}
default ExecutorService getAsyncOperationsThreadPool() {
throw new UnsupportedOperationException();
}
+
+ default void setCheckpointStorageAccess(CheckpointStorageAccess
checkpointStorageAccess) {}
+
+ default CheckpointStorageAccess getCheckpointStorageAccess() {
+ throw new UnsupportedOperationException();
+ }
Review comment:
I think this (and related) change belong to the wrong commit.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final String subtaskName;
+
+ private final long periodicMaterializeDelay;
+
+ /** Allowed number of consecutive materialization failures. */
+ private final int allowedNumberOfFailures;
+
+ /** Number of consecutive materialization failures. */
+ private final AtomicInteger numberOfConsecutiveFailures;
+
+ private final ChangelogKeyedStateBackend<?> keyedStateBackend;
+
+ private boolean started = false;
+
+ PeriodicMaterializationManager(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ String subtaskName,
+ AsyncExceptionHandler asyncExceptionHandler,
+ ChangelogKeyedStateBackend<?> keyedStateBackend,
+ long periodicMaterializeDelay,
+ int allowedNumberOfFailures) {
+ this.mailboxExecutor = checkNotNull(mailboxExecutor);
+ this.asyncOperationsThreadPool =
checkNotNull(asyncOperationsThreadPool);
+ this.subtaskName = checkNotNull(subtaskName);
+ this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+ this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+ this.periodicMaterializeDelay = periodicMaterializeDelay;
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ "periodic-materialization-scheduler-" +
subtaskName));
+ }
+
+ public void start() {
+ if (!started) {
+
+ started = true;
+
+ LOG.info(
+ "Task {} starts periodic materialization, scheduling the
next one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+ }
+
+ private void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ Optional<MaterializationRunnable>
materializationRunnableOptional =
+ keyedStateBackend.initMaterialization();
+
+ if (materializationRunnableOptional.isPresent()) {
+ MaterializationRunnable runnable =
materializationRunnableOptional.get();
+ asyncOperationsThreadPool.execute(
+ () ->
+ asyncMaterializationPhase(
+
runnable.getMaterializationRunnable(),
+ runnable.getMaterializedTo()));
+ } else {
+ scheduleNextMaterialization();
+
+ LOG.info(
+ "Task {} has no state updates since last
materialization, "
+ + "skip this one and schedule the next
one in {} seconds",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+ }
+ },
+ "materialization");
+ }
+
+ private void asyncMaterializationPhase(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ SnapshotResult<KeyedStateHandle> materializedSnapshot =
+ uploadSnapshot(materializedRunnableFuture);
+
+ // if succeed, update state and finish up
+ if (materializedSnapshot != null) {
+
+ numberOfConsecutiveFailures.set(0);
+
+ final SnapshotResult<KeyedStateHandle> copyMaterializedSnapshot =
materializedSnapshot;
+
+ mailboxExecutor.execute(
+ () ->
+ keyedStateBackend.updateChangelogSnapshotState(
+ copyMaterializedSnapshot,
+ upTo),
+ "Task {} update materializedSnapshot up to changelog
sequence number: {}",
+ subtaskName,
+ upTo);
+ }
+
+ LOG.info(
+ "Task {} schedules the next materialization in {} seconds.",
+ subtaskName,
+ periodicMaterializeDelay / 1000);
+
+ scheduleNextMaterialization();
+ }
+
+ private SnapshotResult<KeyedStateHandle> uploadSnapshot(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
+
+ LOG.debug("Task {} finishes asynchronous part of
materialization.", subtaskName);
+
+ return materializedRunnableFuture.get();
+
+ } catch (Exception e) {
+ int retryTime = numberOfConsecutiveFailures.incrementAndGet();
+
+ LOG.info(
+ "Task {} asynchronous part of materialization is not
completed for the {} time.",
+ subtaskName,
+ retryTime,
+ e);
+
+ discardFailedUploads(materializedRunnableFuture);
+
+ if (retryTime >= allowedNumberOfFailures) {
+ // Fail the task externally, this causes task failover
+ asyncExceptionHandler.handleAsyncException(
+ "Task "
+ + subtaskName
+ + " fails to complete the asynchronous part of
materialization",
+ e);
+ }
+ } finally {
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ }
+
+ return null;
+ }
+
+ private void discardFailedUploads(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture) {
+
+ LOG.info("Task {} cleanup asynchronous runnable for materialization.",
subtaskName);
+
+ if (materializedRunnableFuture != null) {
+ // materialization has started
+ if (!materializedRunnableFuture.cancel(true)) {
+ try {
+ StateObject stateObject = materializedRunnableFuture.get();
+ if (stateObject != null) {
+ stateObject.discardState();
+ }
+ } catch (Exception ex) {
+ LOG.debug(
+ "Task "
+ + subtaskName
+ + " cancelled execution of snapshot future
runnable. "
+ + "Cancellation produced the following "
+ + "exception, which is expected and can be
ignored.",
+ ex);
+ }
+ }
+ }
+ }
+
+ // Only be called in the task thread to simplify the threading model
+ private void scheduleNextMaterialization() {
+ periodicExecutor.schedule(
+ this::triggerMaterialization, periodicMaterializeDelay,
TimeUnit.MILLISECONDS);
+ }
+
+ public void close() {
+ if (!periodicExecutor.isShutdown()) {
+ periodicExecutor.shutdownNow();
+ }
Review comment:
I think we should wait for termination here to minimize the chance of
leaving some orphaned state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]