dianfu commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r557003451
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1736,6 +1736,9 @@ def from_data_stream(self, data_stream: DataStream,
*fields: Union[str, Expressi
.. versionadded:: 1.12.0
"""
j_data_stream = data_stream._j_data_stream
+ get_gateway().jvm \
+ .org.apache.flink.python.util.PythonConfigUtil.setManagedMemory(
+ j_data_stream.getTransformation(),
self._j_tenv.getConfig().getConfiguration())
Review comment:
The configuration maybe configured via flink-conf.yaml and then it's not
available in self._j_tenv.getConfig().getConfiguration().
##########
File path:
flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +221,20 @@ private static boolean
isPythonOperator(StreamOperatorFactory streamOperatorFact
}
}
+ private static boolean isPythonOperator(Transformation<?> transform) {
+ if (transform instanceof OneInputTransformation
Review comment:
What about refactor it a bit to make it more readable?
```
if (transform instanceof OneInputTransformation) {
return isPythonOperator(((OneInputTransformation)
transform).getOperatorFactory());
} else if (transform instanceof TwoInputTransformation) {
return isPythonOperator(((TwoInputTransformation)
transform).getOperatorFactory());
} else {
Preconditions.checkState(transform instanceof
AbstractMultipleInputTransformation);
return isPythonOperator(
((AbstractMultipleInputTransformation)
transform).getOperatorFactory());
}
```
##########
File path:
flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -128,6 +128,23 @@ private static void chainStreamNode(
firstStream.setSlotSharingGroup(secondStream.getSlotSharingGroup());
}
+ /** Set Python Operator Use Managed Memory. */
+ public static void setManagedMemory(Transformation<?> transformation,
Configuration config) {
+ if (config.getBoolean(PythonOptions.USE_MANAGED_MEMORY)) {
+ setManagedMemory(transformation);
+ }
+ }
+
+ private static void setManagedMemory(Transformation<?> transformation) {
+ List<Transformation<?>> inputTransformations =
transformation.getInputs();
Review comment:
nit: what about moving this line just before `for (Transformation
inputTransformation : inputTransformations) ` where it's used?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]