rkhachatryan commented on a change in pull request #13797: URL: https://github.com/apache/flink/pull/13797#discussion_r562165994
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageFactory.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; + +import java.io.IOException; + +/** + * A factory to create a specific {@link CheckpointStorage}. The storage creation gets a + * configuration object that can be used to read further config values. + * + * <p>The checkpoint storage factory is typically specified in the configuration to produce a + * configured storage backend. + * + * @param <T> The type of the checkpoint storage created. + */ +@PublicEvolving +public interface CheckpointStorageFactory<T extends CheckpointStorage> { + + /** + * Creates the checkpoint storage, optionally using the given configuration. + * + * @param config The Flink configuration (loaded by the TaskManager). + * @param classLoader The clsas loader that should be used to load the checkpoint storage. + * @return The created checkpoint storage. + * @throws IllegalConfigurationException If the configuration misses critical values, or + * specifies invalid values + * @throws IOException If the checkpoint storage initialization failed due to an I/O exception. + */ + T createFromConfig(ReadableConfig config, ClassLoader classLoader) + throws IllegalConfigurationException, IOException; Review comment: IIUC, this method only creates a "config" object and doesn't perform any IO. If so it shouldn't throw `IOException`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableCheckpointStorage.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; + +/** + * An interface for checkpoint storage types that pick up additional parameters from a + * configuration. + */ +@Internal +public interface ConfigurableCheckpointStorage { Review comment: IIUC, implementors are always `CheckpointStorage`. If so it would be more clear to me if this interface extends `CheckpointStorage`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { Review comment: Is this class missing `@PublicEvolving`? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.junit.Assert.assertNull; + +/** This test validates that checkpoint storage is properly loaded from configuration. */ +public class CheckpointStorageLoadingTest { + + private final ClassLoader cl = getClass().getClassLoader(); + + @Test + public void testNoCheckpointStorageDefined() throws Exception { + assertNull( + CheckpointStorageLoader.loadCheckpointStorageFromConfig( + new Configuration(), cl, null)); + } + + @Test + public void testLegacyStateBackendTakesPrecedence() throws Exception { + StateBackend legacy = new LegacyStateBackend(); + CheckpointStorage storage = new MockStorage(); + + CheckpointStorage configured = + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + storage, legacy, new Configuration(), cl, null); + + Assert.assertEquals( + "Legacy state backends should always take precendence", legacy, configured); + } + + @Test + public void testModernStateBackendDoesNotTakePrecedence() throws Exception { + StateBackend legacy = new ModernStateBackend(); Review comment: Is it `modern` backend actually? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.junit.Assert.assertNull; + +/** This test validates that checkpoint storage is properly loaded from configuration. */ +public class CheckpointStorageLoadingTest { + + private final ClassLoader cl = getClass().getClassLoader(); + + @Test + public void testNoCheckpointStorageDefined() throws Exception { + assertNull( + CheckpointStorageLoader.loadCheckpointStorageFromConfig( + new Configuration(), cl, null)); + } + + @Test + public void testLegacyStateBackendTakesPrecedence() throws Exception { + StateBackend legacy = new LegacyStateBackend(); + CheckpointStorage storage = new MockStorage(); + + CheckpointStorage configured = + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + storage, legacy, new Configuration(), cl, null); + + Assert.assertEquals( + "Legacy state backends should always take precendence", legacy, configured); + } + + @Test + public void testModernStateBackendDoesNotTakePrecedence() throws Exception { + StateBackend legacy = new ModernStateBackend(); + CheckpointStorage storage = new MockStorage(); + + CheckpointStorage configured = + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + storage, legacy, new Configuration(), cl, null); + + Assert.assertEquals( + "Modern state backends should never take precendence", storage, configured); + } + + @Test + public void testLoadingFromFactory() throws Exception { + final Configuration config = new Configuration(); + + config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, WorkingFactory.class.getName()); + CheckpointStorage storage = + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + null, new ModernStateBackend(), config, cl, null); + Assert.assertThat(storage, Matchers.instanceOf(MockStorage.class)); + } + + @Test + public void testLoadingFails() throws Exception { + final Configuration config = new Configuration(); + + config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, "does.not.exist"); + try { + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + null, new ModernStateBackend(), config, cl, null); + Assert.fail("should fail with exception"); + } catch (DynamicCodeLoadingException e) { + // expected + } + + // try a class that is not a factory + config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, java.io.File.class.getName()); + try { + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + null, new ModernStateBackend(), config, cl, null); + Assert.fail("should fail with exception"); + } catch (DynamicCodeLoadingException e) { + // expected + } + + // try a factory that fails + config.setString(CheckpointingOptions.CHECKPOINT_STORAGE, FailingFactory.class.getName()); + try { + CheckpointStorageLoader.fromApplicationOrConfigOrDefault( + null, new ModernStateBackend(), config, cl, null); + Assert.fail("should fail with exception"); + } catch (IOException e) { + // expected + } + } + + // A state backend that also implements checkpoint storage. + static final class LegacyStateBackend implements StateBackend, CheckpointStorage { + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) + throws IOException { + return null; + } + + @Override + public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { + return null; + } + + @Override + public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider, + MetricGroup metricGroup, + Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + return null; + } + + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier, + Collection<OperatorStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + return null; + } + } + + static final class ModernStateBackend implements StateBackend { + + @Override + public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider, + MetricGroup metricGroup, + Collection<KeyedStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + return null; + } + + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier, + Collection<OperatorStateHandle> stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + return null; + } + } + + static final class MockStorage implements CheckpointStorage { + + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) + throws IOException { + return null; + } + + @Override + public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { + return null; + } + } + + static final class WorkingFactory implements CheckpointStorageFactory<MockStorage> { + + @Override + public MockStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) + throws IllegalConfigurationException, IOException { + return new MockStorage(); + } + } + + static final class FailingFactory implements CheckpointStorageFactory<CheckpointStorage> { + + @Override + public CheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) + throws IllegalConfigurationException, IOException { + throw new IOException("fail!"); Review comment: The interface currently declares that it throws `IOException` (I asked a question about it). If it stays then I'd expect that unchecked exception from here should be wrapped into IO (?). If so, we should throw Runtime (or Test) exception from here to check that it get's wrapped. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoadingTest.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.junit.Assert.assertNull; + +/** This test validates that checkpoint storage is properly loaded from configuration. */ +public class CheckpointStorageLoadingTest { Review comment: nit: rename to `CheckpointStorageLoad[er]Test`? (it's easier to find when names match) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; + } + + switch (storageName.toLowerCase()) { + case JOB_MANAGER_STORAGE_NAME: + throw new IllegalStateException( + "JobManagerCheckpointStorage is not yet implemented"); + + case FILE_SYSTEM_STORAGE_NAME: + throw new IllegalStateException( + "FileSystemCheckpointStorage is not yet implemented"); + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", storageName); + } + + CheckpointStorageFactory<?> factory; + try { + @SuppressWarnings("rawtypes") + Class<? extends CheckpointStorageFactory> clazz = + Class.forName(storageName, false, classLoader) + .asSubclass(CheckpointStorageFactory.class); + + factory = clazz.newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + storageName, + e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + CheckpointingOptions.CHECKPOINT_STORAGE.key() + + "' is not a valid checkpoint storage factory (" + + storageName + + ')', + e); + } + + return factory.createFromConfig(config, classLoader); + } + } + + public static CheckpointStorage fromApplicationOrConfigOrDefault( Review comment: I think the load order should be documented (javadoc). I'm assuming this is a part of public API. nit: rename to just `load`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; Review comment: Missing `storageName` looks like a misconfiguration. I'd expect `IllegalArgumentException` or a similar exception in this case. If not, I think it would be less error-prone to return `Optional.empty`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; + } + + switch (storageName.toLowerCase()) { + case JOB_MANAGER_STORAGE_NAME: + throw new IllegalStateException( Review comment: nit: `UnsupportedOperationException`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( Review comment: nit: rename to just `fromConfig`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; + } + + switch (storageName.toLowerCase()) { + case JOB_MANAGER_STORAGE_NAME: + throw new IllegalStateException( + "JobManagerCheckpointStorage is not yet implemented"); + + case FILE_SYSTEM_STORAGE_NAME: + throw new IllegalStateException( + "FileSystemCheckpointStorage is not yet implemented"); + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", storageName); + } + + CheckpointStorageFactory<?> factory; + try { + @SuppressWarnings("rawtypes") + Class<? extends CheckpointStorageFactory> clazz = + Class.forName(storageName, false, classLoader) + .asSubclass(CheckpointStorageFactory.class); + + factory = clazz.newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + storageName, + e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + CheckpointingOptions.CHECKPOINT_STORAGE.key() + + "' is not a valid checkpoint storage factory (" + + storageName + + ')', + e); + } + + return factory.createFromConfig(config, classLoader); + } + } + + public static CheckpointStorage fromApplicationOrConfigOrDefault( + @Nullable CheckpointStorage fromApplication, Review comment: How about `Optional` instead of `@Nullable`? (ditto logger in this and 2nd method). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; + } + + switch (storageName.toLowerCase()) { + case JOB_MANAGER_STORAGE_NAME: + throw new IllegalStateException( + "JobManagerCheckpointStorage is not yet implemented"); + + case FILE_SYSTEM_STORAGE_NAME: + throw new IllegalStateException( + "FileSystemCheckpointStorage is not yet implemented"); + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", storageName); + } + + CheckpointStorageFactory<?> factory; + try { + @SuppressWarnings("rawtypes") + Class<? extends CheckpointStorageFactory> clazz = + Class.forName(storageName, false, classLoader) + .asSubclass(CheckpointStorageFactory.class); + + factory = clazz.newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + storageName, + e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + CheckpointingOptions.CHECKPOINT_STORAGE.key() + + "' is not a valid checkpoint storage factory (" + + storageName + + ')', + e); + } + + return factory.createFromConfig(config, classLoader); + } + } + + public static CheckpointStorage fromApplicationOrConfigOrDefault( + @Nullable CheckpointStorage fromApplication, + StateBackend configuredStateBackend, + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) + throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + Preconditions.checkNotNull(configuredStateBackend, "statebackend"); + + // (1) Legacy state backends always take precedence + // for backwards compatibility. + if (configuredStateBackend instanceof CheckpointStorage) { + if (logger != null) { + logger.info( + "Using legacy state backend {} as Job checkpoint storage", + configuredStateBackend); + } + + return (CheckpointStorage) configuredStateBackend; + } + + CheckpointStorage storage; + + // (2) Application defined checkpoint storage + if (fromApplication != null) { + // see if this is supposed to pick up additional configuration parameters + if (fromApplication instanceof ConfigurableCheckpointStorage) { Review comment: I think the branching in this method can be flattened; furthermore, method can be simplified by returning from each branch: ``` if (configuredStateBackend instanceof CheckpointStorage) { return ...; else if (fromApplication instanceof ConfigurableCheckpointStorage) { return ...; } else if (fromApplication != null) { return ...; } else { return ...; } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; Review comment: Does this field have to be public? (ditto `FILE_SYSTEM_STORAGE_NAME`) ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ########## @@ -468,12 +474,12 @@ private File getNextStoragePath() { @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { - return checkpointStreamBackend.resolveCheckpoint(pointer); + return ((CheckpointStorage) checkpointStreamBackend).resolveCheckpoint(pointer); Review comment: It seems possible to change the field type to `CheckpointStorage` and remove these casts. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -328,6 +329,7 @@ private void setBatchStateBackendAndTimerService(StreamGraph graph) { if (useStateBackend) { LOG.debug("Using BATCH execution state backend and timer service."); graph.setStateBackend(new BatchExecutionStateBackend()); + graph.setCheckpointStorage(new BatchExecutionCheckpointStorage()); Review comment: Should there be a similar call for other cases (line 303)? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java ########## @@ -79,6 +81,9 @@ @JsonProperty(FIELD_NAME_STATE_BACKEND) private final String stateBackend; + @JsonProperty(FIELD_NAME_CHECKPOINT_STORAGE) + private final String checkpointStorage; Review comment: I couldn't find rendering of this field in the UI. Or is it implemented in the subsequent PR? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
