rkhachatryan commented on a change in pull request #13797:
URL: https://github.com/apache/flink/pull/13797#discussion_r562165994



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageFactory.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.io.IOException;
+
+/**
+ * A factory to create a specific {@link CheckpointStorage}. The storage 
creation gets a
+ * configuration object that can be used to read further config values.
+ *
+ * <p>The checkpoint storage factory is typically specified in the 
configuration to produce a
+ * configured storage backend.
+ *
+ * @param <T> The type of the checkpoint storage created.
+ */
+@PublicEvolving
+public interface CheckpointStorageFactory<T extends CheckpointStorage> {
+
+    /**
+     * Creates the checkpoint storage, optionally using the given 
configuration.
+     *
+     * @param config The Flink configuration (loaded by the TaskManager).
+     * @param classLoader The clsas loader that should be used to load the 
checkpoint storage.
+     * @return The created checkpoint storage.
+     * @throws IllegalConfigurationException If the configuration misses 
critical values, or
+     *     specifies invalid values
+     * @throws IOException If the checkpoint storage initialization failed due 
to an I/O exception.
+     */
+    T createFromConfig(ReadableConfig config, ClassLoader classLoader)
+            throws IllegalConfigurationException, IOException;

Review comment:
       IIUC, this method only creates a "config" object and doesn't perform any 
IO.
   If so it shouldn't throw `IOException`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableCheckpointStorage.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+
+/**
+ * An interface for checkpoint storage types that pick up additional 
parameters from a
+ * configuration.
+ */
+@Internal
+public interface ConfigurableCheckpointStorage {

Review comment:
       IIUC, implementors are always `CheckpointStorage`. 
   If so it would be more clear to me if this interface extends 
`CheckpointStorage`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {

Review comment:
       Is this class missing `@PublicEvolving`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertNull;
+
+/** This test validates that checkpoint storage is properly loaded from 
configuration. */
+public class CheckpointStorageLoadingTest {
+
+    private final ClassLoader cl = getClass().getClassLoader();
+
+    @Test
+    public void testNoCheckpointStorageDefined() throws Exception {
+        assertNull(
+                CheckpointStorageLoader.loadCheckpointStorageFromConfig(
+                        new Configuration(), cl, null));
+    }
+
+    @Test
+    public void testLegacyStateBackendTakesPrecedence() throws Exception {
+        StateBackend legacy = new LegacyStateBackend();
+        CheckpointStorage storage = new MockStorage();
+
+        CheckpointStorage configured =
+                CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                        storage, legacy, new Configuration(), cl, null);
+
+        Assert.assertEquals(
+                "Legacy state backends should always take precendence", 
legacy, configured);
+    }
+
+    @Test
+    public void testModernStateBackendDoesNotTakePrecedence() throws Exception 
{
+        StateBackend legacy = new ModernStateBackend();

Review comment:
       Is it `modern` backend actually?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertNull;
+
+/** This test validates that checkpoint storage is properly loaded from 
configuration. */
+public class CheckpointStorageLoadingTest {
+
+    private final ClassLoader cl = getClass().getClassLoader();
+
+    @Test
+    public void testNoCheckpointStorageDefined() throws Exception {
+        assertNull(
+                CheckpointStorageLoader.loadCheckpointStorageFromConfig(
+                        new Configuration(), cl, null));
+    }
+
+    @Test
+    public void testLegacyStateBackendTakesPrecedence() throws Exception {
+        StateBackend legacy = new LegacyStateBackend();
+        CheckpointStorage storage = new MockStorage();
+
+        CheckpointStorage configured =
+                CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                        storage, legacy, new Configuration(), cl, null);
+
+        Assert.assertEquals(
+                "Legacy state backends should always take precendence", 
legacy, configured);
+    }
+
+    @Test
+    public void testModernStateBackendDoesNotTakePrecedence() throws Exception 
{
+        StateBackend legacy = new ModernStateBackend();
+        CheckpointStorage storage = new MockStorage();
+
+        CheckpointStorage configured =
+                CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                        storage, legacy, new Configuration(), cl, null);
+
+        Assert.assertEquals(
+                "Modern state backends should never take precendence", 
storage, configured);
+    }
+
+    @Test
+    public void testLoadingFromFactory() throws Exception {
+        final Configuration config = new Configuration();
+
+        config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, 
WorkingFactory.class.getName());
+        CheckpointStorage storage =
+                CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                        null, new ModernStateBackend(), config, cl, null);
+        Assert.assertThat(storage, Matchers.instanceOf(MockStorage.class));
+    }
+
+    @Test
+    public void testLoadingFails() throws Exception {
+        final Configuration config = new Configuration();
+
+        config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, 
"does.not.exist");
+        try {
+            CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                    null, new ModernStateBackend(), config, cl, null);
+            Assert.fail("should fail with exception");
+        } catch (DynamicCodeLoadingException e) {
+            // expected
+        }
+
+        // try a class that is not a factory
+        config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, 
java.io.File.class.getName());
+        try {
+            CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                    null, new ModernStateBackend(), config, cl, null);
+            Assert.fail("should fail with exception");
+        } catch (DynamicCodeLoadingException e) {
+            // expected
+        }
+
+        // try a factory that fails
+        config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, 
FailingFactory.class.getName());
+        try {
+            CheckpointStorageLoader.fromApplicationOrConfigOrDefault(
+                    null, new ModernStateBackend(), config, cl, null);
+            Assert.fail("should fail with exception");
+        } catch (IOException e) {
+            // expected
+        }
+    }
+
+    // A state backend that also implements checkpoint storage.
+    static final class LegacyStateBackend implements StateBackend, 
CheckpointStorage {
+        @Override
+        public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer)
+                throws IOException {
+            return null;
+        }
+
+        @Override
+        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) 
throws IOException {
+            return null;
+        }
+
+        @Override
+        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
+                Environment env,
+                JobID jobID,
+                String operatorIdentifier,
+                TypeSerializer<K> keySerializer,
+                int numberOfKeyGroups,
+                KeyGroupRange keyGroupRange,
+                TaskKvStateRegistry kvStateRegistry,
+                TtlTimeProvider ttlTimeProvider,
+                MetricGroup metricGroup,
+                Collection<KeyedStateHandle> stateHandles,
+                CloseableRegistry cancelStreamRegistry)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public OperatorStateBackend createOperatorStateBackend(
+                Environment env,
+                String operatorIdentifier,
+                Collection<OperatorStateHandle> stateHandles,
+                CloseableRegistry cancelStreamRegistry)
+                throws Exception {
+            return null;
+        }
+    }
+
+    static final class ModernStateBackend implements StateBackend {
+
+        @Override
+        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
+                Environment env,
+                JobID jobID,
+                String operatorIdentifier,
+                TypeSerializer<K> keySerializer,
+                int numberOfKeyGroups,
+                KeyGroupRange keyGroupRange,
+                TaskKvStateRegistry kvStateRegistry,
+                TtlTimeProvider ttlTimeProvider,
+                MetricGroup metricGroup,
+                Collection<KeyedStateHandle> stateHandles,
+                CloseableRegistry cancelStreamRegistry)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public OperatorStateBackend createOperatorStateBackend(
+                Environment env,
+                String operatorIdentifier,
+                Collection<OperatorStateHandle> stateHandles,
+                CloseableRegistry cancelStreamRegistry)
+                throws Exception {
+            return null;
+        }
+    }
+
+    static final class MockStorage implements CheckpointStorage {
+
+        @Override
+        public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer)
+                throws IOException {
+            return null;
+        }
+
+        @Override
+        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) 
throws IOException {
+            return null;
+        }
+    }
+
+    static final class WorkingFactory implements 
CheckpointStorageFactory<MockStorage> {
+
+        @Override
+        public MockStorage createFromConfig(ReadableConfig config, ClassLoader 
classLoader)
+                throws IllegalConfigurationException, IOException {
+            return new MockStorage();
+        }
+    }
+
+    static final class FailingFactory implements 
CheckpointStorageFactory<CheckpointStorage> {
+
+        @Override
+        public CheckpointStorage createFromConfig(ReadableConfig config, 
ClassLoader classLoader)
+                throws IllegalConfigurationException, IOException {
+            throw new IOException("fail!");

Review comment:
       The interface currently declares that it throws `IOException` (I asked a 
question about it).
   If it stays then I'd expect that unchecked exception from here should be 
wrapped into IO (?).
   If so, we should throw Runtime (or Test) exception from here to check that 
it get's wrapped.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertNull;
+
+/** This test validates that checkpoint storage is properly loaded from 
configuration. */
+public class CheckpointStorageLoadingTest {

Review comment:
       nit: rename to `CheckpointStorageLoad[er]Test`? (it's easier to find 
when names match)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
+            throws IllegalStateException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = 
config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "JobManagerCheckpointStorage is not yet implemented");
+
+            case FILE_SYSTEM_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "FileSystemCheckpointStorage is not yet implemented");
+
+            default:
+                if (logger != null) {
+                    logger.info("Loading state backend via factory {}", 
storageName);
+                }
+
+                CheckpointStorageFactory<?> factory;
+                try {
+                    @SuppressWarnings("rawtypes")
+                    Class<? extends CheckpointStorageFactory> clazz =
+                            Class.forName(storageName, false, classLoader)
+                                    
.asSubclass(CheckpointStorageFactory.class);
+
+                    factory = clazz.newInstance();
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured state backend factory 
class: " + storageName,
+                            e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + 
CheckpointingOptions.CHECKPOINT_STORAGE.key()
+                                    + "' is not a valid checkpoint storage 
factory ("
+                                    + storageName
+                                    + ')',
+                            e);
+                }
+
+                return factory.createFromConfig(config, classLoader);
+        }
+    }
+
+    public static CheckpointStorage fromApplicationOrConfigOrDefault(

Review comment:
       I think the load order should be documented (javadoc). I'm assuming this 
is a part of public API.
   
   nit: rename to just `load`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
+            throws IllegalStateException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = 
config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;

Review comment:
       Missing `storageName` looks like a misconfiguration. I'd expect 
`IllegalArgumentException` or a similar exception in this case.
   
   If not, I think it would be less error-prone to return `Optional.empty`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
+            throws IllegalStateException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = 
config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(

Review comment:
       nit: `UnsupportedOperationException`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(

Review comment:
       nit: rename to just `fromConfig`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
+            throws IllegalStateException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = 
config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "JobManagerCheckpointStorage is not yet implemented");
+
+            case FILE_SYSTEM_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "FileSystemCheckpointStorage is not yet implemented");
+
+            default:
+                if (logger != null) {
+                    logger.info("Loading state backend via factory {}", 
storageName);
+                }
+
+                CheckpointStorageFactory<?> factory;
+                try {
+                    @SuppressWarnings("rawtypes")
+                    Class<? extends CheckpointStorageFactory> clazz =
+                            Class.forName(storageName, false, classLoader)
+                                    
.asSubclass(CheckpointStorageFactory.class);
+
+                    factory = clazz.newInstance();
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured state backend factory 
class: " + storageName,
+                            e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + 
CheckpointingOptions.CHECKPOINT_STORAGE.key()
+                                    + "' is not a valid checkpoint storage 
factory ("
+                                    + storageName
+                                    + ')',
+                            e);
+                }
+
+                return factory.createFromConfig(config, classLoader);
+        }
+    }
+
+    public static CheckpointStorage fromApplicationOrConfigOrDefault(
+            @Nullable CheckpointStorage fromApplication,

Review comment:
       How about `Optional` instead of `@Nullable`? (ditto logger in this and 
2nd method).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link 
CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, 
or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory 
class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its 
{@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', 
and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the 
checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage 
factory is configured and
+     *     the factory class was not found or the factory could not be 
instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when 
instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger 
logger)
+            throws IllegalStateException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = 
config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "JobManagerCheckpointStorage is not yet implemented");
+
+            case FILE_SYSTEM_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "FileSystemCheckpointStorage is not yet implemented");
+
+            default:
+                if (logger != null) {
+                    logger.info("Loading state backend via factory {}", 
storageName);
+                }
+
+                CheckpointStorageFactory<?> factory;
+                try {
+                    @SuppressWarnings("rawtypes")
+                    Class<? extends CheckpointStorageFactory> clazz =
+                            Class.forName(storageName, false, classLoader)
+                                    
.asSubclass(CheckpointStorageFactory.class);
+
+                    factory = clazz.newInstance();
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured state backend factory 
class: " + storageName,
+                            e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + 
CheckpointingOptions.CHECKPOINT_STORAGE.key()
+                                    + "' is not a valid checkpoint storage 
factory ("
+                                    + storageName
+                                    + ')',
+                            e);
+                }
+
+                return factory.createFromConfig(config, classLoader);
+        }
+    }
+
+    public static CheckpointStorage fromApplicationOrConfigOrDefault(
+            @Nullable CheckpointStorage fromApplication,
+            StateBackend configuredStateBackend,
+            Configuration config,
+            ClassLoader classLoader,
+            @Nullable Logger logger)
+            throws IllegalConfigurationException, DynamicCodeLoadingException, 
IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+        Preconditions.checkNotNull(configuredStateBackend, "statebackend");
+
+        // (1) Legacy state backends always take precedence
+        // for backwards compatibility.
+        if (configuredStateBackend instanceof CheckpointStorage) {
+            if (logger != null) {
+                logger.info(
+                        "Using legacy state backend {} as Job checkpoint 
storage",
+                        configuredStateBackend);
+            }
+
+            return (CheckpointStorage) configuredStateBackend;
+        }
+
+        CheckpointStorage storage;
+
+        // (2) Application defined checkpoint storage
+        if (fromApplication != null) {
+            // see if this is supposed to pick up additional configuration 
parameters
+            if (fromApplication instanceof ConfigurableCheckpointStorage) {

Review comment:
       I think the branching in this method can be flattened; furthermore, 
method can be simplified by returning from each branch:
   ```
   if (configuredStateBackend instanceof CheckpointStorage) {
       return ...;
   else if (fromApplication instanceof ConfigurableCheckpointStorage) {
       return ...;
   } else if (fromApplication != null) {
       return ...;
   } else {
       return ...;
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from 
configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";

Review comment:
       Does this field have to be public?
   
   (ditto `FILE_SYSTEM_STORAGE_NAME`)

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
##########
@@ -468,12 +474,12 @@ private File getNextStoragePath() {
 
     @Override
     public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
-        return checkpointStreamBackend.resolveCheckpoint(pointer);
+        return ((CheckpointStorage) 
checkpointStreamBackend).resolveCheckpoint(pointer);

Review comment:
       It seems possible to change the field type to `CheckpointStorage` and 
remove these casts.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -328,6 +329,7 @@ private void 
setBatchStateBackendAndTimerService(StreamGraph graph) {
         if (useStateBackend) {
             LOG.debug("Using BATCH execution state backend and timer 
service.");
             graph.setStateBackend(new BatchExecutionStateBackend());
+            graph.setCheckpointStorage(new BatchExecutionCheckpointStorage());

Review comment:
       Should there be a similar call for other cases (line 303)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
##########
@@ -79,6 +81,9 @@
     @JsonProperty(FIELD_NAME_STATE_BACKEND)
     private final String stateBackend;
 
+    @JsonProperty(FIELD_NAME_CHECKPOINT_STORAGE)
+    private final String checkpointStorage;

Review comment:
       I couldn't find rendering of this field in the UI. 
   Or is it implemented in the subsequent PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to