rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r733855539



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
         return is;
     }
 
-    private void completeRestore(Collection<ChangelogStateBackendHandle> 
stateHandles) {
-        if (!stateHandles.isEmpty()) {
-            synchronized (materialized) { // ensure visibility
-                for (ChangelogStateBackendHandle h : stateHandles) {
-                    if (h != null) {
-                        materialized.addAll(h.getMaterializedStateHandles());
-                        
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-                    }
-                }
+    public void registerCloseable(@Nullable Closeable closeable) {
+        closer.register(closeable);
+    }
+
+    private ChangelogSnapshotState completeRestore(
+            Collection<ChangelogStateBackendHandle> stateHandles) {
+
+        List<KeyedStateHandle> materialized = new ArrayList<>();
+        List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
+
+        for (ChangelogStateBackendHandle h : stateHandles) {
+            if (h != null) {
+                materialized.addAll(h.getMaterializedStateHandles());
+                
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
             }
         }
+
         changelogStates.clear();
+        return new ChangelogSnapshotState(
+                materialized,
+                restoredNonMaterialized,
+                stateChangelogWriter.initialSequenceNumber());
+    }
+
+    /**
+     * Initialize state materialization so that materialized data can be 
persisted durably and
+     * included into the checkpoint.
+     *
+     * <p>This method is not thread safe. It should be called either under a 
lock or through task
+     * mailbox executor.
+     *
+     * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+     *     SequenceNumber} identifying the latest change in the changelog
+     */
+    public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
+        SequenceNumber upTo = getLastAppendedTo();
+        SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
+
+        LOG.info(
+                "Initialize Materialization. Current changelog writers last 
append to sequence number {}",
+                upTo);
+
+        if (upTo.compareTo(lastMaterializedTo) > 0) {
+
+            LOG.info("Starting materialization from {} : {}", 
lastMaterializedTo, upTo);
+
+            return Optional.of(
+                    new MaterializationRunnable(
+                            keyedStateBackend.snapshot(
+                                    // This ID is not needed for 
materialization;
+                                    // But since we are re-using the 
streamFactory
+                                    // that is designed for state backend 
snapshot,
+                                    // which requires unique checkpoint ID.
+                                    // A faked materialized Id is provided 
here.
+                                    // TODO: implement its own streamFactory.
+                                    materializedId++,
+                                    System.currentTimeMillis(),

Review comment:
       WDYT about using `materializedId++` here to avoid unnecessary system 
call?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -121,7 +124,8 @@
                                         ttlTimeProvider,
                                         metricGroup,
                                         baseHandles,
-                                        cancelStreamRegistry));
+                                        cancelStreamRegistry),
+                cancelStreamRegistry);

Review comment:
       This argument is unused (ditto 2nd constructor).

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+/** A collection of all configuration options that relate to changelog. */
+@PublicEvolving
+public class StateChangelogOptions {
+
+    @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+    public static final ConfigOption<Duration> 
PERIODIC_MATERIALIZATION_INTERVAL =
+            
ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "Defines the interval in milliseconds to perform "
+                                    + "periodic materialization for state 
backend.");
+
+    @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+    public static final ConfigOption<Integer> 
MATERIALIZATION_MAX_FAILURES_ALLOWED =
+            ConfigOptions.key("state.backend.changelog.max-failures-allowed")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("Max number of consecutive 
materialization failures allowed.");
+
+    /** Whether to enable state change log. */
+    @Documentation.Section(value = 
Documentation.Sections.STATE_BACKEND_CHANGELOG)
+    public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
+            ConfigOptions.key("state.backend.changelog.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable state backend to write state 
changes to StateChangelog. "
+                                    + "If this config is not set explicitly, 
it means no preference "
+                                    + "for enabling the change log, and the 
value in lower config "
+                                    + "level will take effect. The default 
value 'false' here means "
+                                    + "if no value set (job or cluster), the 
change log will not be "
+                                    + "enabled.");
+
+    /**
+     * Which storage to use to store state changelog.
+     *
+     * <p>Recognized shortcut name is 'memory' from {@code
+     * InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also 
the default value.
+     */
+    @Documentation.Section(value = 
Documentation.Sections.STATE_BACKEND_CHANGELOG)
+    public static final ConfigOption<String> STATE_CHANGE_LOG_STORAGE =
+            ConfigOptions.key("state.backend.changelog.storage")
+                    .stringType()
+                    .defaultValue("memory")
+                    .withDescription(
+                            Description.builder()
+                                    .text("The storage to be used to store 
state changelog.")
+                                    .linebreak()
+                                    .text(
+                                            "The implementation can be 
specified via their"
+                                                    + " shortcut name.")
+                                    .linebreak()
+                                    .text(
+                                            "The list of recognized shortcut 
names currently includes"
+                                                    + " 'memory' only.")

Review comment:
       and 'filesystem`?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
##########
@@ -76,4 +121,155 @@ public static TestTaskStateManager 
createTaskStateManager(File changelogStorageP
                                 Path.fromLocalFile(changelogStoragePath), 
false, 1024))
                 .build();
     }
+
+    public static void testMaterializedRestore(
+            StateBackend stateBackend, Environment env, 
CheckpointStreamFactory streamFactory)
+            throws Exception {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+        TypeInformation<StateBackendTestBase.TestPojo> pojoType =
+                new GenericTypeInfo<>(StateBackendTestBase.TestPojo.class);
+        ValueStateDescriptor<StateBackendTestBase.TestPojo> kvId =
+                new ValueStateDescriptor<>("id", pojoType);
+
+        ChangelogKeyedStateBackend<Integer> keyedBackend =
+                (ChangelogKeyedStateBackend<Integer>) 
createKeyedBackend(stateBackend, env);
+
+        CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
+        PeriodicMaterializationManager periodicMaterializationManager =
+                new PeriodicMaterializationManager(
+                        checkNotNull(env.getMainMailboxExecutor()),
+                        checkNotNull(env.getAsyncOperationsThreadPool()),
+                        env.getTaskInfo().getTaskNameWithSubtasks(),
+                        (message, exception) -> 
asyncComplete.completeExceptionally(exception),
+                        keyedBackend,
+                        10,
+                        1);
+
+        periodicMaterializationManager.start();
+
+        try {
+            ValueState<StateBackendTestBase.TestPojo> state =
+                    keyedBackend.getPartitionedState(
+                            VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+            keyedBackend.setCurrentKey(1);
+            state.update(new StateBackendTestBase.TestPojo("u1", 1));
+
+            keyedBackend.setCurrentKey(2);
+            state.update(new StateBackendTestBase.TestPojo("u2", 2));
+
+            awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+            keyedBackend.setCurrentKey(2);
+            state.update(new StateBackendTestBase.TestPojo("u2", 22));
+
+            keyedBackend.setCurrentKey(3);
+            state.update(new StateBackendTestBase.TestPojo("u3", 3));
+
+            awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+            KeyedStateHandle snapshot =
+                    runSnapshot(
+                            keyedBackend.snapshot(
+                                    682375462378L,
+                                    2,
+                                    streamFactory,
+                                    
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                            sharedStateRegistry);
+
+            IOUtils.closeQuietly(keyedBackend);
+            keyedBackend.dispose();
+            periodicMaterializationManager.close();
+
+            // make sure the asycn phase completes successfully
+            assertFalse(asyncComplete.isCompletedExceptionally());

Review comment:
       Just `asyncComplete.get` would show the actual failure reason, so I'd 
remove assertion here.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {

Review comment:
       Can be package private.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
##########
@@ -76,4 +121,155 @@ public static TestTaskStateManager 
createTaskStateManager(File changelogStorageP
                                 Path.fromLocalFile(changelogStoragePath), 
false, 1024))
                 .build();
     }
+
+    public static void testMaterializedRestore(
+            StateBackend stateBackend, Environment env, 
CheckpointStreamFactory streamFactory)
+            throws Exception {
+        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+        TypeInformation<StateBackendTestBase.TestPojo> pojoType =
+                new GenericTypeInfo<>(StateBackendTestBase.TestPojo.class);
+        ValueStateDescriptor<StateBackendTestBase.TestPojo> kvId =
+                new ValueStateDescriptor<>("id", pojoType);
+
+        ChangelogKeyedStateBackend<Integer> keyedBackend =
+                (ChangelogKeyedStateBackend<Integer>) 
createKeyedBackend(stateBackend, env);
+
+        CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
+        PeriodicMaterializationManager periodicMaterializationManager =
+                new PeriodicMaterializationManager(
+                        checkNotNull(env.getMainMailboxExecutor()),
+                        checkNotNull(env.getAsyncOperationsThreadPool()),
+                        env.getTaskInfo().getTaskNameWithSubtasks(),
+                        (message, exception) -> 
asyncComplete.completeExceptionally(exception),
+                        keyedBackend,
+                        10,
+                        1);
+
+        periodicMaterializationManager.start();
+
+        try {
+            ValueState<StateBackendTestBase.TestPojo> state =
+                    keyedBackend.getPartitionedState(
+                            VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+            keyedBackend.setCurrentKey(1);
+            state.update(new StateBackendTestBase.TestPojo("u1", 1));
+
+            keyedBackend.setCurrentKey(2);
+            state.update(new StateBackendTestBase.TestPojo("u2", 2));
+
+            awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+            keyedBackend.setCurrentKey(2);
+            state.update(new StateBackendTestBase.TestPojo("u2", 22));
+
+            keyedBackend.setCurrentKey(3);
+            state.update(new StateBackendTestBase.TestPojo("u3", 3));
+
+            awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
+
+            KeyedStateHandle snapshot =
+                    runSnapshot(
+                            keyedBackend.snapshot(
+                                    682375462378L,
+                                    2,
+                                    streamFactory,
+                                    
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                            sharedStateRegistry);
+
+            IOUtils.closeQuietly(keyedBackend);
+            keyedBackend.dispose();
+            periodicMaterializationManager.close();

Review comment:
       Aren't we doing this twice: here and in `finally`?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -203,27 +208,55 @@ public StateBackend configure(ReadableConfig config, 
ClassLoader classLoader)
             KeyGroupRange keyGroupRange,
             TtlTimeProvider ttlTimeProvider,
             Collection<KeyedStateHandle> stateHandles,
-            BaseBackendBuilder<K> baseBackendBuilder)
+            BaseBackendBuilder<K> baseBackendBuilder,
+            CloseableRegistry cancelStreamRegistry)
             throws Exception {
         StateChangelogStorage<?> changelogStorage =
                 Preconditions.checkNotNull(
                         env.getTaskStateManager().getStateChangelogStorage(),
                         "Changelog storage is null when creating and restoring"
                                 + " the ChangelogKeyedStateBackend.");
-        return ChangelogBackendRestoreOperation.restore(
-                changelogStorage.createReader(),
-                env.getUserCodeClassLoader().asClassLoader(),
-                castHandles(stateHandles),
-                baseBackendBuilder,
-                (baseBackend, baseState) ->
-                        new ChangelogKeyedStateBackend(
-                                baseBackend,
-                                env.getExecutionConfig(),
-                                ttlTimeProvider,
-                                
changelogStorage.createWriter(operatorIdentifier, keyGroupRange),
-                                baseState,
-                                env.getMainMailboxExecutor(),
-                                env.getAsyncOperationsThreadPool()));
+
+        String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
+        ExecutionConfig executionConfig = env.getExecutionConfig();
+
+        ChangelogKeyedStateBackend<K> keyedStateBackend =
+                ChangelogBackendRestoreOperation.restore(
+                        changelogStorage.createReader(),
+                        env.getUserCodeClassLoader().asClassLoader(),
+                        castHandles(stateHandles),
+                        baseBackendBuilder,
+                        (baseBackend, baseState) ->
+                                new ChangelogKeyedStateBackend(
+                                        baseBackend,
+                                        subtaskName,
+                                        executionConfig,
+                                        ttlTimeProvider,
+                                        changelogStorage.createWriter(
+                                                operatorIdentifier, 
keyGroupRange),
+                                        baseState,
+                                        env.getCheckpointStorageAccess()));
+
+        PeriodicMaterializationManager periodicMaterializationManager =
+                new PeriodicMaterializationManager(
+                        checkNotNull(env.getMainMailboxExecutor()),
+                        checkNotNull(env.getAsyncOperationsThreadPool()),
+                        subtaskName,
+                        (message, exception) ->
+                                env.failExternally(new 
AsynchronousException(message, exception)),
+                        keyedStateBackend,
+                        executionConfig.getPeriodicMaterializeIntervalMillis(),
+                        
executionConfig.getMaterializationMaxAllowedFailures());
+
+        // keyedStateBackend is responsible to close 
periodicMaterializationManager
+        // This indicates periodicMaterializationManager binds to the 
keyedStateBackend
+        // However PeriodicMaterializationManager can not be part of 
keyedStateBackend
+        // because of

Review comment:
       Unfinished comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to