[
https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Huang Xingbo updated FLINK-26462:
---------------------------------
Description:
h1. Setup
* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}
* Prepare a Python Virtual Environment
{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}
* Install PyFlink from source code. For more details, you can refer to the
[doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setup.py
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}
h1. Test
* Write a python udf job named demo.py in process mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# process mode !
t_env.get_config().get_configuration().set_string("python.execution-mode",
"process")
# optinal values
t_env.get_config().get_configuration().set_string("parallelism.default",
"2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
* run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
* change the python udf job to multi-thread mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# multi-thread mode
t_env.get_config().get_configuration().set_string("python.execution-mode",
"multi-thread")
t_env.get_config().get_configuration().set_string("parallelism.default",
"2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
* run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
was:
h1. Setup
Prepare a Python Virtual Environment
{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}
h1. Test
* Write a python udf job named demo.py in process mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# process mode !
t_env.get_config().get_configuration().set_string("python.execution-mode",
"process")
# optinal values
t_env.get_config().get_configuration().set_string("parallelism.default",
"2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
* run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
* change the python udf job to multi-thread mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# multi-thread mode
t_env.get_config().get_configuration().set_string("python.execution-mode",
"multi-thread")
t_env.get_config().get_configuration().set_string("parallelism.default",
"2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
* run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
> Release Testing: Running Python UDF in different Execution Mode
> ---------------------------------------------------------------
>
> Key: FLINK-26462
> URL: https://issues.apache.org/jira/browse/FLINK-26462
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Affects Versions: 1.15.0
> Reporter: Huang Xingbo
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.15.0
>
>
> h1. Setup
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setup.py
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python udf job named demo.py in process mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # process mode !
>
> t_env.get_config().get_configuration().set_string("python.execution-mode",
> "process")
> # optinal values
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b',
> 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
> _c0 c _c2
> 0 3 1 1
> 1 7 2 1
> 2 4 3 1
> {code}
> * change the python udf job to multi-thread mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # multi-thread mode
>
> t_env.get_config().get_configuration().set_string("python.execution-mode",
> "multi-thread")
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b',
> 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
> _c0 c _c2
> 0 3 1 1
> 1 7 2 1
> 2 4 3 1
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)