dianfu commented on a change in pull request #16063:
URL: https://github.com/apache/flink/pull/16063#discussion_r658546898
##########
File path: flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
##########
@@ -65,6 +65,15 @@
"When it is false, metric for Python will be
disabled. You can "
+ "disable the metric to achieve better
performance at some circumstance.");
+ /** The configuration to enable or disable profile for Python execution. */
+ public static final ConfigOption<Boolean> PYTHON_PROFILE_ENABLED =
+ ConfigOptions.key("python.profile.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When it is true, profile for Python will be
enabled. You can "
Review comment:
Specifies whether to enable Python worker profiling. The profile result
will be displayed in the log file of the TaskManager periodically. The interval
between each profiling is determined by the config options
python.fn-execution.bundle.size and python.fn-execution.bundle.time.
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -71,18 +75,11 @@ def progress_metrics(self):
def process(self, o: WindowedValue):
with self.scoped_process_state:
- output_stream = self.consumer.output_stream
- if isinstance(self.operation, BundleOperation):
- for value in o.value:
- self.process_element(value)
- self._value_coder_impl.encode_to_stream(
- self.operation.finish_bundle(), output_stream, True)
- output_stream.maybe_flush()
+ if self._profile_enabled:
+ with self._profiler:
Review comment:
What about perform profiling at bundle wise?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -37,6 +38,9 @@ def __init__(self, name, spec, counter_factory, sampler,
consumers, operation_cl
self.operation = self.generate_operation()
self.process_element = self.operation.process_element
self.operation.open()
+ self._profile_enabled = self.operation.is_profile_enabled()
+ if self._profile_enabled:
Review comment:
The instance variable _profile_enabled could be removed by checking if
self._profiler is None.
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -478,13 +479,14 @@ def close_func():
return process_element_func, open_func, close_func
-"""
-All these Enum Classes MUST be in sync with
-org.apache.flink.streaming.api.utils.PythonOperatorUtils if there are any
changes.
-"""
+class Profiler(object):
Review comment:
What about moving it to a separate file, such as profiler.py?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]