hequn8128 commented on a change in pull request #10453: [FLINK-14026][python] 
Manage the resource of Python worker properly
URL: https://github.com/apache/flink/pull/10453#discussion_r354690513
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 ##########
 @@ -100,6 +108,30 @@ public void open() throws Exception {
 
                        Map<String, String> jobParams = 
getExecutionConfig().getGlobalJobParameters().toMap();
 
+                       long requiredPythonWorkerMemory =
+                               MemorySize.parse(
+                                       
jobParams.getOrDefault(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.key(),
+                                               
String.valueOf(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.defaultValue())))
+                                       
.add(MemorySize.parse(jobParams.getOrDefault(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.key(),
+                                               
String.valueOf(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.defaultValue()))))
+                                       .getBytes();
+                       MemoryManager memoryManager = 
getContainingTask().getEnvironment().getMemoryManager();
+                       long availableManagedMemory = 
memoryManager.computeMemorySize(
+                               getOperatorConfig().getManagedMemoryFraction());
+                       if (requiredPythonWorkerMemory <= 
availableManagedMemory) {
+                               memoryManager.reserveMemory(this, 
MemoryType.OFF_HEAP, requiredPythonWorkerMemory);
+                               LOG.info("Reserved memory {} for Python 
worker.", requiredPythonWorkerMemory);
+                               this.reservedMemory = 
requiredPythonWorkerMemory;
+                               // TODO enforce the memory limit of Python 
worker
+                       } else {
+                               LOG.warn("Required Python worker memory {} 
exceeds the available managed off-heap " +
+                                       "memory {}. Skipping reserving off-heap 
memory from the MemoryManager. This does " +
+                                       "not affect the functionality. However, 
it may affect the stability of a job as " +
+                                       "the memory used by the Python worker 
is not managed by Flink.",
+                                       requiredPythonWorkerMemory, 
availableManagedMemory);
+                               this.reservedMemory = -1;
+                       }
 
 Review comment:
   Extract a method for this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to