Myasuka commented on a change in pull request #13500:
URL: https://github.com/apache/flink/pull/13500#discussion_r503146610
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -508,12 +515,18 @@ public void setStateBackend(StateBackend backend) {
if (backend != null) {
try {
InstantiationUtil.writeObjectToConfig(backend,
this.config, STATE_BACKEND);
+
this.config.setBoolean(STATE_BACKEND_USE_MANAGED_MEMORY,
backend.useManagedMemory());
Review comment:
For the purpose to test method, I think we should use
`setStateBackendUsesManagedMemory(backend.useManagedMemory());` here
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -101,6 +101,10 @@
private static final String TIME_CHARACTERISTIC = "timechar";
private static final String MANAGED_MEMORY_FRACTION_PREFIX =
"managedMemFraction.";
+ private static final ConfigOption<Boolean>
STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions
Review comment:
Add description for this config could help more.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -233,6 +236,41 @@ public static StateBackend
fromApplicationOrConfigOrDefault(
return backend;
}
+ /**
+ * Checks whether state backend uses managed memory, without having to
deserialize or load the state backend.
+ * @param config Cluster configuration.
+ * @param stateBackendFromApplicationUsesManagedMemory Whether the
application-defined backend uses Flink's managed
+ * memory. Empty
if application has not defined a backend.
+ * @param classLoader User code classloader.
+ * @return Whether the state backend uses managed memory.
+ */
+ public static boolean
stateBackendFromApplicationOrConfigOrDefaultUseManagedMemory(
+ Configuration config,
+ Optional<Boolean>
stateBackendFromApplicationUsesManagedMemory,
+ ClassLoader classLoader) {
+
+ checkNotNull(config, "config");
+
+ // (1) the application defined state backend has precedence
+ if (stateBackendFromApplicationUsesManagedMemory.isPresent()) {
+ return
stateBackendFromApplicationUsesManagedMemory.get();
+ }
+
+ // (2) check if the config defines a state backend
+ try {
+ final StateBackend fromConfig =
loadStateBackendFromConfig(config, classLoader, null);
Review comment:
We could pass parameter `LOG` instead of `null` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]