hequn8128 commented on a change in pull request #10453: [FLINK-14026][python]
Manage the resource of Python worker properly
URL: https://github.com/apache/flink/pull/10453#discussion_r354690513
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -100,6 +108,30 @@ public void open() throws Exception {
Map<String, String> jobParams =
getExecutionConfig().getGlobalJobParameters().toMap();
+ long requiredPythonWorkerMemory =
+ MemorySize.parse(
+
jobParams.getOrDefault(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.key(),
+
String.valueOf(PythonOptions.PYTHON_FRAMEWORK_MEMORY_SIZE.defaultValue())))
+
.add(MemorySize.parse(jobParams.getOrDefault(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.key(),
+
String.valueOf(PythonOptions.PYTHON_DATA_BUFFER_MEMORY_SIZE.defaultValue()))))
+ .getBytes();
+ MemoryManager memoryManager =
getContainingTask().getEnvironment().getMemoryManager();
+ long availableManagedMemory =
memoryManager.computeMemorySize(
+ getOperatorConfig().getManagedMemoryFraction());
+ if (requiredPythonWorkerMemory <=
availableManagedMemory) {
+ memoryManager.reserveMemory(this,
MemoryType.OFF_HEAP, requiredPythonWorkerMemory);
+ LOG.info("Reserved memory {} for Python
worker.", requiredPythonWorkerMemory);
+ this.reservedMemory =
requiredPythonWorkerMemory;
+ // TODO enforce the memory limit of Python
worker
+ } else {
+ LOG.warn("Required Python worker memory {}
exceeds the available managed off-heap " +
+ "memory {}. Skipping reserving off-heap
memory from the MemoryManager. This does " +
+ "not affect the functionality. However,
it may affect the stability of a job as " +
+ "the memory used by the Python worker
is not managed by Flink.",
+ requiredPythonWorkerMemory,
availableManagedMemory);
+ this.reservedMemory = -1;
+ }
Review comment:
Extract a method for this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services