dianfu commented on a change in pull request #16063:
URL: https://github.com/apache/flink/pull/16063#discussion_r658546898



##########
File path: flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
##########
@@ -65,6 +65,15 @@
                             "When it is false, metric for Python will be 
disabled. You can "
                                     + "disable the metric to achieve better 
performance at some circumstance.");
 
+    /** The configuration to enable or disable profile for Python execution. */
+    public static final ConfigOption<Boolean> PYTHON_PROFILE_ENABLED =
+            ConfigOptions.key("python.profile.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When it is true, profile for Python will be 
enabled. You can "

Review comment:
       Specifies whether to enable Python worker profiling. The profile result 
will be displayed in the log file of the TaskManager periodically. The interval 
between each profiling is determined by the config options 
python.fn-execution.bundle.size and python.fn-execution.bundle.time.

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -71,18 +75,11 @@ def progress_metrics(self):
 
     def process(self, o: WindowedValue):
         with self.scoped_process_state:
-            output_stream = self.consumer.output_stream
-            if isinstance(self.operation, BundleOperation):
-                for value in o.value:
-                    self.process_element(value)
-                self._value_coder_impl.encode_to_stream(
-                    self.operation.finish_bundle(), output_stream, True)
-                output_stream.maybe_flush()
+            if self._profile_enabled:
+                with self._profiler:

Review comment:
       What about perform profiling at bundle wise?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -37,6 +38,9 @@ def __init__(self, name, spec, counter_factory, sampler, 
consumers, operation_cl
         self.operation = self.generate_operation()
         self.process_element = self.operation.process_element
         self.operation.open()
+        self._profile_enabled = self.operation.is_profile_enabled()
+        if self._profile_enabled:

Review comment:
       The instance variable _profile_enabled could be removed by checking if 
self._profiler is None.

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -478,13 +479,14 @@ def close_func():
     return process_element_func, open_func, close_func
 
 
-"""
-All these Enum Classes MUST be in sync with
-org.apache.flink.streaming.api.utils.PythonOperatorUtils if there are any 
changes.
-"""
+class Profiler(object):

Review comment:
       What about moving it to a separate file, such as profiler.py?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to