rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r733855539
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
return is;
}
- private void completeRestore(Collection<ChangelogStateBackendHandle>
stateHandles) {
- if (!stateHandles.isEmpty()) {
- synchronized (materialized) { // ensure visibility
- for (ChangelogStateBackendHandle h : stateHandles) {
- if (h != null) {
- materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
- }
- }
+ public void registerCloseable(@Nullable Closeable closeable) {
+ closer.register(closeable);
+ }
+
+ private ChangelogSnapshotState completeRestore(
+ Collection<ChangelogStateBackendHandle> stateHandles) {
+
+ List<KeyedStateHandle> materialized = new ArrayList<>();
+ List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
+
+ for (ChangelogStateBackendHandle h : stateHandles) {
+ if (h != null) {
+ materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
}
}
+
changelogStates.clear();
+ return new ChangelogSnapshotState(
+ materialized,
+ restoredNonMaterialized,
+ stateChangelogWriter.initialSequenceNumber());
+ }
+
+ /**
+ * Initialize state materialization so that materialized data can be
persisted durably and
+ * included into the checkpoint.
+ *
+ * <p>This method is not thread safe. It should be called either under a
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+ public Optional<MaterializationRunnable> initMaterialization() throws
Exception {
+ SequenceNumber upTo = getLastAppendedTo();
+ SequenceNumber lastMaterializedTo =
changelogSnapshotState.lastMaterializedTo();
+
+ LOG.info(
+ "Initialize Materialization. Current changelog writers last
append to sequence number {}",
+ upTo);
+
+ if (upTo.compareTo(lastMaterializedTo) > 0) {
+
+ LOG.info("Starting materialization from {} : {}",
lastMaterializedTo, upTo);
+
+ return Optional.of(
+ new MaterializationRunnable(
+ keyedStateBackend.snapshot(
+ // This ID is not needed for
materialization;
+ // But since we are re-using the
streamFactory
+ // that is designed for state backend
snapshot,
+ // which requires unique checkpoint ID.
+ // A faked materialized Id is provided
here.
+ // TODO: implement its own streamFactory.
+ materializedId++,
+ System.currentTimeMillis(),
Review comment:
WDYT about using `materializedId++` here to avoid unnecessary system
call?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -121,7 +124,8 @@
ttlTimeProvider,
metricGroup,
baseHandles,
- cancelStreamRegistry));
+ cancelStreamRegistry),
+ cancelStreamRegistry);
Review comment:
This argument is unused (ditto 2nd constructor).
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+/** A collection of all configuration options that relate to changelog. */
+@PublicEvolving
+public class StateChangelogOptions {
+
+ @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<Duration>
PERIODIC_MATERIALIZATION_INTERVAL =
+
ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "Defines the interval in milliseconds to perform "
+ + "periodic materialization for state
backend.");
+
+ @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<Integer>
MATERIALIZATION_MAX_FAILURES_ALLOWED =
+ ConfigOptions.key("state.backend.changelog.max-failures-allowed")
+ .intType()
+ .defaultValue(3)
+ .withDescription("Max number of consecutive
materialization failures allowed.");
+
+ /** Whether to enable state change log. */
+ @Documentation.Section(value =
Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
+ ConfigOptions.key("state.backend.changelog.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable state backend to write state
changes to StateChangelog. "
+ + "If this config is not set explicitly,
it means no preference "
+ + "for enabling the change log, and the
value in lower config "
+ + "level will take effect. The default
value 'false' here means "
+ + "if no value set (job or cluster), the
change log will not be "
+ + "enabled.");
+
+ /**
+ * Which storage to use to store state changelog.
+ *
+ * <p>Recognized shortcut name is 'memory' from {@code
+ * InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also
the default value.
+ */
+ @Documentation.Section(value =
Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ public static final ConfigOption<String> STATE_CHANGE_LOG_STORAGE =
+ ConfigOptions.key("state.backend.changelog.storage")
+ .stringType()
+ .defaultValue("memory")
+ .withDescription(
+ Description.builder()
+ .text("The storage to be used to store
state changelog.")
+ .linebreak()
+ .text(
+ "The implementation can be
specified via their"
+ + " shortcut name.")
+ .linebreak()
+ .text(
+ "The list of recognized shortcut
names currently includes"
+ + " 'memory' only.")
Review comment:
and 'filesystem`?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
##########
@@ -76,4 +121,155 @@ public static TestTaskStateManager
createTaskStateManager(File changelogStorageP
Path.fromLocalFile(changelogStoragePath),
false, 1024))
.build();
}
+
+ public static void testMaterializedRestore(
+ StateBackend stateBackend, Environment env,
CheckpointStreamFactory streamFactory)
+ throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+ TypeInformation<StateBackendTestBase.TestPojo> pojoType =
+ new GenericTypeInfo<>(StateBackendTestBase.TestPojo.class);
+ ValueStateDescriptor<StateBackendTestBase.TestPojo> kvId =
+ new ValueStateDescriptor<>("id", pojoType);
+
+ ChangelogKeyedStateBackend<Integer> keyedBackend =
+ (ChangelogKeyedStateBackend<Integer>)
createKeyedBackend(stateBackend, env);
+
+ CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
+ PeriodicMaterializationManager periodicMaterializationManager =
+ new PeriodicMaterializationManager(
+ checkNotNull(env.getMainMailboxExecutor()),
+ checkNotNull(env.getAsyncOperationsThreadPool()),
+ env.getTaskInfo().getTaskNameWithSubtasks(),
+ (message, exception) ->
asyncComplete.completeExceptionally(exception),
+ keyedBackend,
+ 10,
+ 1);
+
+ periodicMaterializationManager.start();
+
+ try {
+ ValueState<StateBackendTestBase.TestPojo> state =
+ keyedBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ keyedBackend.setCurrentKey(1);
+ state.update(new StateBackendTestBase.TestPojo("u1", 1));
+
+ keyedBackend.setCurrentKey(2);
+ state.update(new StateBackendTestBase.TestPojo("u2", 2));
+
+ awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+ keyedBackend.setCurrentKey(2);
+ state.update(new StateBackendTestBase.TestPojo("u2", 22));
+
+ keyedBackend.setCurrentKey(3);
+ state.update(new StateBackendTestBase.TestPojo("u3", 3));
+
+ awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+ KeyedStateHandle snapshot =
+ runSnapshot(
+ keyedBackend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+
CheckpointOptions.forCheckpointWithDefaultLocation()),
+ sharedStateRegistry);
+
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ periodicMaterializationManager.close();
+
+ // make sure the asycn phase completes successfully
+ assertFalse(asyncComplete.isCompletedExceptionally());
Review comment:
Just `asyncComplete.get` would show the actual failure reason, so I'd
remove assertion here.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
Review comment:
Can be package private.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
##########
@@ -76,4 +121,155 @@ public static TestTaskStateManager
createTaskStateManager(File changelogStorageP
Path.fromLocalFile(changelogStoragePath),
false, 1024))
.build();
}
+
+ public static void testMaterializedRestore(
+ StateBackend stateBackend, Environment env,
CheckpointStreamFactory streamFactory)
+ throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+ TypeInformation<StateBackendTestBase.TestPojo> pojoType =
+ new GenericTypeInfo<>(StateBackendTestBase.TestPojo.class);
+ ValueStateDescriptor<StateBackendTestBase.TestPojo> kvId =
+ new ValueStateDescriptor<>("id", pojoType);
+
+ ChangelogKeyedStateBackend<Integer> keyedBackend =
+ (ChangelogKeyedStateBackend<Integer>)
createKeyedBackend(stateBackend, env);
+
+ CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
+ PeriodicMaterializationManager periodicMaterializationManager =
+ new PeriodicMaterializationManager(
+ checkNotNull(env.getMainMailboxExecutor()),
+ checkNotNull(env.getAsyncOperationsThreadPool()),
+ env.getTaskInfo().getTaskNameWithSubtasks(),
+ (message, exception) ->
asyncComplete.completeExceptionally(exception),
+ keyedBackend,
+ 10,
+ 1);
+
+ periodicMaterializationManager.start();
+
+ try {
+ ValueState<StateBackendTestBase.TestPojo> state =
+ keyedBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ keyedBackend.setCurrentKey(1);
+ state.update(new StateBackendTestBase.TestPojo("u1", 1));
+
+ keyedBackend.setCurrentKey(2);
+ state.update(new StateBackendTestBase.TestPojo("u2", 2));
+
+ awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+ keyedBackend.setCurrentKey(2);
+ state.update(new StateBackendTestBase.TestPojo("u2", 22));
+
+ keyedBackend.setCurrentKey(3);
+ state.update(new StateBackendTestBase.TestPojo("u3", 3));
+
+ awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+ KeyedStateHandle snapshot =
+ runSnapshot(
+ keyedBackend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+
CheckpointOptions.forCheckpointWithDefaultLocation()),
+ sharedStateRegistry);
+
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ periodicMaterializationManager.close();
Review comment:
Aren't we doing this twice: here and in `finally`?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -203,27 +208,55 @@ public StateBackend configure(ReadableConfig config,
ClassLoader classLoader)
KeyGroupRange keyGroupRange,
TtlTimeProvider ttlTimeProvider,
Collection<KeyedStateHandle> stateHandles,
- BaseBackendBuilder<K> baseBackendBuilder)
+ BaseBackendBuilder<K> baseBackendBuilder,
+ CloseableRegistry cancelStreamRegistry)
throws Exception {
StateChangelogStorage<?> changelogStorage =
Preconditions.checkNotNull(
env.getTaskStateManager().getStateChangelogStorage(),
"Changelog storage is null when creating and restoring"
+ " the ChangelogKeyedStateBackend.");
- return ChangelogBackendRestoreOperation.restore(
- changelogStorage.createReader(),
- env.getUserCodeClassLoader().asClassLoader(),
- castHandles(stateHandles),
- baseBackendBuilder,
- (baseBackend, baseState) ->
- new ChangelogKeyedStateBackend(
- baseBackend,
- env.getExecutionConfig(),
- ttlTimeProvider,
-
changelogStorage.createWriter(operatorIdentifier, keyGroupRange),
- baseState,
- env.getMainMailboxExecutor(),
- env.getAsyncOperationsThreadPool()));
+
+ String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
+ ExecutionConfig executionConfig = env.getExecutionConfig();
+
+ ChangelogKeyedStateBackend<K> keyedStateBackend =
+ ChangelogBackendRestoreOperation.restore(
+ changelogStorage.createReader(),
+ env.getUserCodeClassLoader().asClassLoader(),
+ castHandles(stateHandles),
+ baseBackendBuilder,
+ (baseBackend, baseState) ->
+ new ChangelogKeyedStateBackend(
+ baseBackend,
+ subtaskName,
+ executionConfig,
+ ttlTimeProvider,
+ changelogStorage.createWriter(
+ operatorIdentifier,
keyGroupRange),
+ baseState,
+ env.getCheckpointStorageAccess()));
+
+ PeriodicMaterializationManager periodicMaterializationManager =
+ new PeriodicMaterializationManager(
+ checkNotNull(env.getMainMailboxExecutor()),
+ checkNotNull(env.getAsyncOperationsThreadPool()),
+ subtaskName,
+ (message, exception) ->
+ env.failExternally(new
AsynchronousException(message, exception)),
+ keyedStateBackend,
+ executionConfig.getPeriodicMaterializeIntervalMillis(),
+
executionConfig.getMaterializationMaxAllowedFailures());
+
+ // keyedStateBackend is responsible to close
periodicMaterializationManager
+ // This indicates periodicMaterializationManager binds to the
keyedStateBackend
+ // However PeriodicMaterializationManager can not be part of
keyedStateBackend
+ // because of
Review comment:
Unfinished comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]