This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9cc55d6dc11 [hotfix][python] Use off-heap memory if managed memory
fraction is 0
9cc55d6dc11 is described below
commit 9cc55d6dc11024311c88b024546305e833d3d025
Author: Dian Fu <[email protected]>
AuthorDate: Tue Dec 13 14:14:33 2022 +0800
[hotfix][python] Use off-heap memory if managed memory fraction is 0
---
.../python/beam/BeamPythonFunctionRunner.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 474b633b24d..510d77723a1 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -248,14 +248,10 @@ public abstract class BeamPythonFunctionRunner implements
PythonFunctionRunner {
Struct pipelineOptions =
PipelineOptionsTranslation.toProto(portableOptions);
- if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) {
- Preconditions.checkArgument(
- managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
- String.format(
- "The configured managed memory fraction for Python
worker process must be within (0, 1], was: %s. "
- + "It may be because the consumer type
\"Python\" was missing or set to 0 for the config option
\"taskmanager.memory.managed.consumer-weights\".",
- managedMemoryFraction));
-
+ if (memoryManager != null
+ && config.get(USE_MANAGED_MEMORY)
+ && managedMemoryFraction > 0
+ && managedMemoryFraction <= 1.0) {
final LongFunctionWithException<PythonSharedResources, Exception>
initializer =
(size) ->
new PythonSharedResources(
@@ -274,6 +270,15 @@ public abstract class BeamPythonFunctionRunner implements
PythonFunctionRunner {
sharedResources.getResourceHandle().getEnvironment();
stageBundleFactory = createStageBundleFactory(jobBundleFactory,
environment);
} else {
+ if (memoryManager != null
+ && config.get(USE_MANAGED_MEMORY)
+ && (managedMemoryFraction <= 0 || managedMemoryFraction >
1.0)) {
+ LOG.warn(
+ String.format(
+ "The configured managed memory fraction for
Python worker process must be within (0, 1], was: %s, use off-heap memory
instead."
+ + "Please see config option
\"taskmanager.memory.managed.consumer-weights\" for more details.",
+ managedMemoryFraction));
+ }
// there is no way to access the MemoryManager for the batch job
of old planner,
// fallback to the way that spawning a Python process for each
Python operator
jobBundleFactory = createJobBundleFactory(pipelineOptions);