This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 208d1ef [FLINK-22862][python] Support profiling in PyFlink
208d1ef is described below
commit 208d1ef2aea7aa4907f15e4e229c99b07c1e0921
Author: huangxingbo <[email protected]>
AuthorDate: Thu Jun 3 14:32:38 2021 +0800
[FLINK-22862][python] Support profiling in PyFlink
This closes #16063.
---
docs/content.zh/docs/dev/python/debugging.md | 11 ++
docs/content/docs/dev/python/debugging.md | 10 ++
.../shortcodes/generated/python_configuration.html | 6 +
.../fn_execution/beam/beam_operations_fast.pxd | 1 +
.../fn_execution/beam/beam_operations_fast.pyx | 9 +
.../fn_execution/beam/beam_operations_slow.py | 9 +
.../pyflink/fn_execution/flink_fn_execution_pb2.py | 189 ++++++++++++---------
.../{beam/beam_operations_fast.pxd => profiler.py} | 28 ++-
.../pyflink/proto/flink-fn-execution.proto | 4 +
.../java/org/apache/flink/python/PythonConfig.java | 8 +
.../org/apache/flink/python/PythonOptions.java | 11 ++
.../AbstractPythonStreamAggregateOperator.java | 1 +
...stractArrowPythonAggregateFunctionOperator.java | 1 +
...wPythonOverWindowAggregateFunctionOperator.java | 1 +
.../AbstractPythonScalarFunctionOperator.java | 1 +
.../python/table/PythonTableFunctionOperator.java | 1 +
.../org/apache/flink/python/PythonOptionsTest.java | 16 ++
17 files changed, 206 insertions(+), 101 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/debugging.md
b/docs/content.zh/docs/dev/python/debugging.md
index 77f7c30..2692a25 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -73,3 +73,14 @@ $ python -c "import pyflink;import
os;print(os.path.dirname(os.path.abspath(pyfl
4. 启动刚刚创建的Python Remote Dubug Server
5. 运行你的Python代码
+
+
+## Profiling Python UDFs
+
+你可以打开profile来分析性能瓶颈
+
+```python
+t_env.get_config().get_configuration().set_boolean("python.profile.enabled",
True)
+```
+
+你可以在[日志](#查看日志)里面查看profile的结果
diff --git a/docs/content/docs/dev/python/debugging.md
b/docs/content/docs/dev/python/debugging.md
index aba85c7..2bbc263 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -74,3 +74,13 @@ You can make use of the
[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycha
4. Start the previously created Python Remote Debug Server
5. Run your Python Code
+
+## Profiling Python UDFs
+
+You can enable the profile to analyze performance bottlenecks.
+
+```python
+t_env.get_config().get_configuration().set_boolean("python.profile.enabled",
True)
+```
+
+Then you can see the profile result in [logs](#accessing-logs)
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html
b/docs/layouts/shortcodes/generated/python_configuration.html
index 1f88dbb..ccf4e54 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -81,6 +81,12 @@
<td>When it is false, metric for Python will be disabled. You can
disable the metric to achieve better performance at some circumstance.</td>
</tr>
<tr>
+ <td><h5>python.profile.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Specifies whether to enable Python worker profiling. The
profile result will be displayed in the log file of the TaskManager
periodically. The interval between each profiling is determined by the config
options python.fn-execution.bundle.size and
python.fn-execution.bundle.time.</td>
+ </tr>
+ <tr>
<td><h5>python.requirements</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
index 5adea48..031fbfe 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
@@ -30,6 +30,7 @@ cdef class FunctionOperation(Operation):
cdef object process_element
cdef object operation
cdef object operation_cls
+ cdef object _profiler
cdef object generate_operation(self)
cdef class StatelessFunctionOperation(FunctionOperation):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index fabc607..545e56c 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -26,6 +26,7 @@ from pyflink.fn_execution.beam.beam_stream_fast cimport
BeamOutputStream
from pyflink.fn_execution.beam.beam_coder_impl_fast import
FlinkLengthPrefixCoderBeamWrapper
from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper
from pyflink.fn_execution.table.operations import BundleOperation
+from pyflink.fn_execution.profiler import Profiler
cdef class FunctionOperation(Operation):
"""
@@ -48,15 +49,23 @@ cdef class FunctionOperation(Operation):
self.operation = self.generate_operation()
self.process_element = self.operation.process_element
self.operation.open()
+ if spec.serialized_fn.profile_enabled:
+ self._profiler = Profiler()
+ else:
+ self._profiler = None
cpdef start(self):
with self.scoped_start_state:
super(FunctionOperation, self).start()
+ if self._profiler:
+ self._profiler.start()
cpdef finish(self):
with self.scoped_finish_state:
super(FunctionOperation, self).finish()
self.operation.finish()
+ if self._profiler:
+ self._profiler.close()
cpdef teardown(self):
with self.scoped_finish_state:
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 600041f..edab3ae 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -21,6 +21,7 @@ from apache_beam.runners.worker.operations import Operation
from apache_beam.utils.windowed_value import WindowedValue
from pyflink.fn_execution.table.operations import BundleOperation
+from pyflink.fn_execution.profiler import Profiler
class FunctionOperation(Operation):
@@ -37,6 +38,10 @@ class FunctionOperation(Operation):
self.operation = self.generate_operation()
self.process_element = self.operation.process_element
self.operation.open()
+ if spec.serialized_fn.profile_enabled:
+ self._profiler = Profiler()
+ else:
+ self._profiler = None
def setup(self):
super(FunctionOperation, self).setup()
@@ -44,11 +49,15 @@ class FunctionOperation(Operation):
def start(self):
with self.scoped_start_state:
super(FunctionOperation, self).start()
+ if self._profiler:
+ self._profiler.start()
def finish(self):
with self.scoped_finish_state:
super(FunctionOperation, self).finish()
self.operation.finish()
+ if self._profiler:
+ self._profiler.close()
def needs_finalization(self):
return False
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index cb58fbf..3937f50 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='flink-fn-execution.proto',
package='org.apache.flink.fn_execution.v1',
syntax='proto3',
- serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
\x01(\x0c\x12\x37\n\x06inputs\x18\x02
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+ serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
\x01(\x0c\x12\x37\n\x06inputs\x18\x02
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
)
@@ -82,8 +82,8 @@ _OVERWINDOW_WINDOWTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=693,
- serialized_end=901,
+ serialized_start=718,
+ serialized_end=926,
)
_sym_db.RegisterEnumDescriptor(_OVERWINDOW_WINDOWTYPE)
@@ -108,8 +108,8 @@ _GROUPWINDOW_WINDOWTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=2050,
- serialized_end=2141,
+ serialized_start=2075,
+ serialized_end=2166,
)
_sym_db.RegisterEnumDescriptor(_GROUPWINDOW_WINDOWTYPE)
@@ -138,8 +138,8 @@ _GROUPWINDOW_WINDOWPROPERTY = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=2143,
- serialized_end=2242,
+ serialized_start=2168,
+ serialized_end=2267,
)
_sym_db.RegisterEnumDescriptor(_GROUPWINDOW_WINDOWPROPERTY)
@@ -236,8 +236,8 @@ _SCHEMA_TYPENAME = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=4496,
- serialized_end=4785,
+ serialized_start=4546,
+ serialized_end=4835,
)
_sym_db.RegisterEnumDescriptor(_SCHEMA_TYPENAME)
@@ -338,8 +338,8 @@ _TYPEINFO_TYPENAME = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=5610,
- serialized_end=5905,
+ serialized_start=5660,
+ serialized_end=5955,
)
_sym_db.RegisterEnumDescriptor(_TYPEINFO_TYPENAME)
@@ -388,8 +388,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE =
_descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=6652,
- serialized_end=6808,
+ serialized_start=6727,
+ serialized_end=6883,
)
_sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
@@ -410,8 +410,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=7775,
- serialized_end=7807,
+ serialized_start=7850,
+ serialized_end=7882,
)
_sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
@@ -551,6 +551,13 @@ _USERDEFINEDFUNCTIONS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='profile_enabled',
full_name='org.apache.flink.fn_execution.v1.UserDefinedFunctions.profile_enabled',
index=3,
+ number=4, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -564,7 +571,7 @@ _USERDEFINEDFUNCTIONS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=371,
- serialized_end=549,
+ serialized_end=574,
)
@@ -609,8 +616,8 @@ _OVERWINDOW = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=552,
- serialized_end=901,
+ serialized_start=577,
+ serialized_end=926,
)
@@ -640,8 +647,8 @@ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1432,
- serialized_end=1516,
+ serialized_start=1457,
+ serialized_end=1541,
)
_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW = _descriptor.Descriptor(
@@ -677,8 +684,8 @@ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1519,
- serialized_end=1670,
+ serialized_start=1544,
+ serialized_end=1695,
)
_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC = _descriptor.Descriptor(
@@ -731,8 +738,8 @@ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC =
_descriptor.Descriptor(
name='data_view',
full_name='org.apache.flink.fn_execution.v1.UserDefinedAggregateFunction.DataViewSpec.data_view',
index=0, containing_type=None, fields=[]),
],
- serialized_start=1169,
- serialized_end=1683,
+ serialized_start=1194,
+ serialized_end=1708,
)
_USERDEFINEDAGGREGATEFUNCTION = _descriptor.Descriptor(
@@ -796,8 +803,8 @@ _USERDEFINEDAGGREGATEFUNCTION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=904,
- serialized_end=1683,
+ serialized_start=929,
+ serialized_end=1708,
)
@@ -892,8 +899,8 @@ _GROUPWINDOW = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1686,
- serialized_end=2242,
+ serialized_start=1711,
+ serialized_end=2267,
)
@@ -988,6 +995,13 @@ _USERDEFINEDAGGREGATEFUNCTIONS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='profile_enabled',
full_name='org.apache.flink.fn_execution.v1.UserDefinedAggregateFunctions.profile_enabled',
index=12,
+ number=13, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -1000,8 +1014,8 @@ _USERDEFINEDAGGREGATEFUNCTIONS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2245,
- serialized_end=2754,
+ serialized_start=2270,
+ serialized_end=2804,
)
@@ -1038,8 +1052,8 @@ _SCHEMA_MAPINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2832,
- serialized_end=2983,
+ serialized_start=2882,
+ serialized_end=3033,
)
_SCHEMA_TIMEINFO = _descriptor.Descriptor(
@@ -1068,8 +1082,8 @@ _SCHEMA_TIMEINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2985,
- serialized_end=3014,
+ serialized_start=3035,
+ serialized_end=3064,
)
_SCHEMA_TIMESTAMPINFO = _descriptor.Descriptor(
@@ -1098,8 +1112,8 @@ _SCHEMA_TIMESTAMPINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3016,
- serialized_end=3050,
+ serialized_start=3066,
+ serialized_end=3100,
)
_SCHEMA_LOCALZONEDTIMESTAMPINFO = _descriptor.Descriptor(
@@ -1128,8 +1142,8 @@ _SCHEMA_LOCALZONEDTIMESTAMPINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3052,
- serialized_end=3096,
+ serialized_start=3102,
+ serialized_end=3146,
)
_SCHEMA_ZONEDTIMESTAMPINFO = _descriptor.Descriptor(
@@ -1158,8 +1172,8 @@ _SCHEMA_ZONEDTIMESTAMPINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3098,
- serialized_end=3137,
+ serialized_start=3148,
+ serialized_end=3187,
)
_SCHEMA_DECIMALINFO = _descriptor.Descriptor(
@@ -1195,8 +1209,8 @@ _SCHEMA_DECIMALINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3139,
- serialized_end=3186,
+ serialized_start=3189,
+ serialized_end=3236,
)
_SCHEMA_BINARYINFO = _descriptor.Descriptor(
@@ -1225,8 +1239,8 @@ _SCHEMA_BINARYINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3188,
- serialized_end=3216,
+ serialized_start=3238,
+ serialized_end=3266,
)
_SCHEMA_VARBINARYINFO = _descriptor.Descriptor(
@@ -1255,8 +1269,8 @@ _SCHEMA_VARBINARYINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3218,
- serialized_end=3249,
+ serialized_start=3268,
+ serialized_end=3299,
)
_SCHEMA_CHARINFO = _descriptor.Descriptor(
@@ -1285,8 +1299,8 @@ _SCHEMA_CHARINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3251,
- serialized_end=3277,
+ serialized_start=3301,
+ serialized_end=3327,
)
_SCHEMA_VARCHARINFO = _descriptor.Descriptor(
@@ -1315,8 +1329,8 @@ _SCHEMA_VARCHARINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3279,
- serialized_end=3308,
+ serialized_start=3329,
+ serialized_end=3358,
)
_SCHEMA_FIELDTYPE = _descriptor.Descriptor(
@@ -1439,8 +1453,8 @@ _SCHEMA_FIELDTYPE = _descriptor.Descriptor(
name='type_info',
full_name='org.apache.flink.fn_execution.v1.Schema.FieldType.type_info',
index=0, containing_type=None, fields=[]),
],
- serialized_start=3311,
- serialized_end=4383,
+ serialized_start=3361,
+ serialized_end=4433,
)
_SCHEMA_FIELD = _descriptor.Descriptor(
@@ -1483,8 +1497,8 @@ _SCHEMA_FIELD = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=4385,
- serialized_end=4493,
+ serialized_start=4435,
+ serialized_end=4543,
)
_SCHEMA = _descriptor.Descriptor(
@@ -1514,8 +1528,8 @@ _SCHEMA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2757,
- serialized_end=4785,
+ serialized_start=2807,
+ serialized_end=4835,
)
@@ -1552,8 +1566,8 @@ _TYPEINFO_MAPTYPEINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=5199,
- serialized_end=5338,
+ serialized_start=5249,
+ serialized_end=5388,
)
_TYPEINFO_ROWTYPEINFO_FIELD = _descriptor.Descriptor(
@@ -1589,8 +1603,8 @@ _TYPEINFO_ROWTYPEINFO_FIELD = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=5434,
- serialized_end=5525,
+ serialized_start=5484,
+ serialized_end=5575,
)
_TYPEINFO_ROWTYPEINFO = _descriptor.Descriptor(
@@ -1619,8 +1633,8 @@ _TYPEINFO_ROWTYPEINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=5341,
- serialized_end=5525,
+ serialized_start=5391,
+ serialized_end=5575,
)
_TYPEINFO_TUPLETYPEINFO = _descriptor.Descriptor(
@@ -1649,8 +1663,8 @@ _TYPEINFO_TUPLETYPEINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=5527,
- serialized_end=5607,
+ serialized_start=5577,
+ serialized_end=5657,
)
_TYPEINFO = _descriptor.Descriptor(
@@ -1711,8 +1725,8 @@ _TYPEINFO = _descriptor.Descriptor(
name='type_info',
full_name='org.apache.flink.fn_execution.v1.TypeInfo.type_info',
index=0, containing_type=None, fields=[]),
],
- serialized_start=4788,
- serialized_end=5918,
+ serialized_start=4838,
+ serialized_end=5968,
)
@@ -1749,8 +1763,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=6268,
- serialized_end=6310,
+ serialized_start=6343,
+ serialized_end=6385,
)
_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
@@ -1828,8 +1842,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=6313,
- serialized_end=6649,
+ serialized_start=6388,
+ serialized_end=6724,
)
_USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
@@ -1874,6 +1888,13 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='profile_enabled',
full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.profile_enabled',
index=5,
+ number=6, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -1887,8 +1908,8 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=5921,
- serialized_end=6808,
+ serialized_start=5971,
+ serialized_end=6883,
)
@@ -1918,8 +1939,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7404,
- serialized_end=7478,
+ serialized_start=7479,
+ serialized_end=7553,
)
_CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -1948,8 +1969,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7480,
- serialized_end=7547,
+ serialized_start=7555,
+ serialized_end=7622,
)
_CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -1978,8 +1999,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7549,
- serialized_end=7618,
+ serialized_start=7624,
+ serialized_end=7693,
)
_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2008,8 +2029,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7620,
- serialized_end=7699,
+ serialized_start=7695,
+ serialized_end=7774,
)
_CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2038,8 +2059,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7701,
- serialized_end=7773,
+ serialized_start=7776,
+ serialized_end=7848,
)
_CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2114,8 +2135,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
name='data_type',
full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
index=0, containing_type=None, fields=[]),
],
- serialized_start=6811,
- serialized_end=7820,
+ serialized_start=6886,
+ serialized_end=7895,
)
_INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
b/flink-python/pyflink/fn_execution/profiler.py
similarity index 57%
copy from flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
copy to flink-python/pyflink/fn_execution/profiler.py
index 5adea48..df01272 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
+++ b/flink-python/pyflink/fn_execution/profiler.py
@@ -15,25 +15,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-# cython: language_level=3
-from apache_beam.coders.coder_impl cimport StreamCoderImpl
-from apache_beam.runners.worker.operations cimport Operation
+import cProfile
+import pstats
-from pyflink.fn_execution.coder_impl_fast cimport LengthPrefixBaseCoderImpl
-cdef class FunctionOperation(Operation):
- cdef Operation consumer
- cdef bint _is_python_coder
- cdef StreamCoderImpl _value_coder_impl
- cdef LengthPrefixBaseCoderImpl _output_coder
- cdef object process_element
- cdef object operation
- cdef object operation_cls
- cdef object generate_operation(self)
+class Profiler(object):
+ def __init__(self):
+ self._pr = cProfile.Profile()
-cdef class StatelessFunctionOperation(FunctionOperation):
- pass
+ def start(self):
+ self._pr.enable()
-cdef class StatefulFunctionOperation(FunctionOperation):
- cdef object keyed_state_backend
+ def close(self):
+ self._pr.disable()
+ ps = pstats.Stats(self._pr).sort_stats('cumulative')
+ ps.print_stats()
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto
b/flink-python/pyflink/proto/flink-fn-execution.proto
index 7dfa83e..81a3fd8 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -64,6 +64,7 @@ message UserDefinedFunctions {
repeated UserDefinedFunction udfs = 1;
bool metric_enabled = 2;
repeated OverWindow windows = 3;
+ bool profile_enabled = 4;
}
// Used to describe the info of over window in pandas batch over window
aggregation
@@ -179,6 +180,8 @@ message UserDefinedAggregateFunctions {
// Group Window.
GroupWindow group_window = 12;
+
+ bool profile_enabled = 13;
}
// A representation of the data schema.
@@ -369,6 +372,7 @@ message UserDefinedDataStreamFunction {
bytes payload = 3;
bool metric_enabled = 4;
TypeInfo key_type_info = 5;
+ bool profile_enabled = 6;
}
// ------------------------------------------------------------------------
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
index 9fa8383..d862167 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
@@ -86,6 +86,9 @@ public class PythonConfig implements Serializable {
/** The Configuration that contains execution configs and dependencies
info. */
private final Configuration config;
+ /** Whether profile is enabled. */
+ private final boolean profileEnabled;
+
public PythonConfig(Configuration config) {
this.config = config;
maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE);
@@ -106,6 +109,7 @@ public class PythonConfig implements Serializable {
pythonExec = config.get(PythonOptions.PYTHON_EXECUTABLE);
metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED);
isUsingManagedMemory =
config.getBoolean(PythonOptions.USE_MANAGED_MEMORY);
+ profileEnabled =
config.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
}
public int getMaxBundleSize() {
@@ -144,6 +148,10 @@ public class PythonConfig implements Serializable {
return metricEnabled;
}
+ public boolean isProfileEnabled() {
+ return profileEnabled;
+ }
+
public boolean isUsingManagedMemory() {
return isUsingManagedMemory;
}
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index ec51f6e..7e57df7 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -65,6 +65,17 @@ public class PythonOptions {
"When it is false, metric for Python will be
disabled. You can "
+ "disable the metric to achieve better
performance at some circumstance.");
+ /** The configuration to enable or disable profile for Python execution. */
+ public static final ConfigOption<Boolean> PYTHON_PROFILE_ENABLED =
+ ConfigOptions.key("python.profile.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Specifies whether to enable Python worker
profiling. The profile result "
+ + "will be displayed in the log file of
the TaskManager periodically. "
+ + "The interval between each profiling is
determined by the config options "
+ + "python.fn-execution.bundle.size and
python.fn-execution.bundle.time.");
+
public static final ConfigOption<String> PYTHON_FILES =
ConfigOptions.key("python.files")
.stringType()
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 2db2c08..c845e61 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -245,6 +245,7 @@ public abstract class AbstractPythonStreamAggregateOperator
FlinkFnApi.UserDefinedAggregateFunctions.Builder builder =
FlinkFnApi.UserDefinedAggregateFunctions.newBuilder();
builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
+ builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
builder.addAllGrouping(Arrays.stream(grouping).boxed().collect(Collectors.toList()));
builder.setGenerateUpdateBefore(generateUpdateBefore);
builder.setIndexOfCountStar(indexOfCountStar);
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index ae94f7e..2ef2038 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -154,6 +154,7 @@ public abstract class
AbstractArrowPythonAggregateFunctionOperator
builder.addUdfs(getUserDefinedFunctionProto(pythonFunctionInfo));
}
builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
+ builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
return builder.build();
}
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index 414e6e6..a004baa 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -257,6 +257,7 @@ public class
BatchArrowPythonOverWindowAggregateFunctionOperator
builder.addUdfs(functionBuilder);
}
builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
+ builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
// add windows
for (int i = 0; i < lowerBoundary.length; i++) {
FlinkFnApi.OverWindow.Builder windowBuilder =
FlinkFnApi.OverWindow.newBuilder();
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
index 4f9255d..b286164 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
@@ -120,6 +120,7 @@ public abstract class AbstractPythonScalarFunctionOperator
builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
}
builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
+ builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
return builder.build();
}
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 9efd17f..0a51717 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -157,6 +157,7 @@ public class PythonTableFunctionOperator
FlinkFnApi.UserDefinedFunctions.newBuilder();
builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(tableFunction));
builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
+ builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
return builder.build();
}
diff --git
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index c77b3fb..9dcb4a9 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -92,6 +92,22 @@ public class PythonOptionsTest {
}
@Test
+ public void testPythonProfileEnabled() {
+ final Configuration configuration = new Configuration();
+ final boolean isProfileEnabled =
+ configuration.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
+ assertThat(
+ isProfileEnabled,
is(equalTo(PythonOptions.PYTHON_PROFILE_ENABLED.defaultValue())));
+
+ final boolean expectedIsProfileEnabled = true;
+ configuration.setBoolean(PythonOptions.PYTHON_PROFILE_ENABLED, true);
+
+ final boolean actualIsProfileEnabled =
+ configuration.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
+ assertThat(actualIsProfileEnabled,
is(equalTo(expectedIsProfileEnabled)));
+ }
+
+ @Test
public void testPythonFiles() {
final Configuration configuration = new Configuration();
final Optional<String> defaultPythonFiles =