rkhachatryan commented on a change in pull request #18223:
URL: https://github.com/apache/flink/pull/18223#discussion_r791776997
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
##########
@@ -50,16 +50,45 @@
private Savepoint() {}
+ /**
+ * Loads an existing savepoint. Useful if you want to query, modify, or
extend the state of an
+ * existing application. The savepoint will be read using the state
backend defined via the
+ * clusters configuration.
+ *
+ * @param env The execution environment used to transform the savepoint.
+ * @param path The path to an existing savepoint on disk.
+ * @see #load(ExecutionEnvironment, String, StateBackend)
+ */
+ public static ExistingSavepoint load(ExecutionEnvironment env, String
path) throws IOException {
Review comment:
I think most of the code can be shared with an overloaded version of
this method (by extracting a private method and calling it from both).
ditto `create()`
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/UnionStateInputFormat.java
##########
@@ -40,10 +42,16 @@
* Creates an input format for reading union state from an operator in a
savepoint.
*
* @param operatorState The state to be queried.
+ * @param configuration The cluster configuration for restoring the
backend.
+ * @param backend The state backend used to restore the state.
* @param descriptor The descriptor for this state, providing a name and
serializer.
*/
- public UnionStateInputFormat(OperatorState operatorState,
ListStateDescriptor<OT> descriptor) {
- super(operatorState, true);
+ public UnionStateInputFormat(
+ OperatorState operatorState,
+ Configuration configuration,
+ StateBackend backend,
Review comment:
`@Nullable` ?
ditto: `BootstrapTransformation`, `ListStateInputFormat`,
`BroadcastStateInputFormat`
##########
File path:
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/CustomStateBackendFactory.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.utils;
+
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+import java.io.IOException;
+
+/** A simple custom {@link StateBackendFactory} that throws an exception. */
+public class CustomStateBackendFactory implements
StateBackendFactory<StateBackend> {
+
+ @Override
+ public StateBackend createFromConfig(ReadableConfig config, ClassLoader
classLoader)
+ throws IllegalConfigurationException, IOException {
+ throw new ExpectedException();
Review comment:
Do I understand correctly the purpose of this factory: verify that if a
backend is not provided then it's loaded from config - by checking that
`createFromConfig` is called?
If so, wouldn't it be possible to use `MockStateBackend` and then check
maybe in operator that it is the actual backend? I think such check would be
more reliable.
OTH, the effort might not worth it, so I'm also fine with the current
approach.
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
##########
@@ -78,12 +110,36 @@ public static SavepointWriter fromExistingSavepoint(String
path, StateBackend st
return new SavepointWriter(savepointMetadata, stateBackend);
}
+ /**
+ * Creates a new savepoint. The savepoint will be written using the state
backend defined via
+ * the clusters configuration.
+ *
+ * @param maxParallelism The max parallelism of the savepoint.
+ * @return A {@link SavepointWriter}.
+ * @see #newSavepoint(StateBackend, int)
+ * @see #withConfiguration(ConfigOption, Object)
+ */
+ public static SavepointWriter newSavepoint(int maxParallelism) {
+ Preconditions.checkArgument(
+ maxParallelism > 0 && maxParallelism <=
UPPER_BOUND_MAX_PARALLELISM,
+ "Maximum parallelism must be between 1 and "
+ + UPPER_BOUND_MAX_PARALLELISM
+ + ". Found: "
+ + maxParallelism);
+
+ SavepointMetadataV2 metadata =
+ new SavepointMetadataV2(
+ maxParallelism, Collections.emptyList(),
Collections.emptyList());
Review comment:
I see that inside `SavepointMetadataV2` constructor, one list is
defensively copied while another is not (`masterStates`). This may potentially
cause `UnsupportedOperationException` if there are any attempts to modify it.
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
##########
@@ -52,12 +54,42 @@
@PublicEvolving
public class SavepointWriter {
+ /**
+ * Loads an existing savepoint. Useful if you want to modify or extend the
state of an existing
+ * application. The savepoint will be written using the state backend
defined via the clusters
+ * configuration.
Review comment:
What happens if the configuration doesn't (explicitly) specify any
backend? I guess `HashMapStateBackend` will be used, right?
NIT: Should this be documented here and tested in `SavepointWriterTest`? Or
at least a reference in the javadoc?
ditto: `newSavepoint()`, `SavepointReader`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]