[
https://issues.apache.org/jira/browse/FLINK-22728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348967#comment-17348967
]
JYXL commented on FLINK-22728:
------------------------------
hi Dian:
I have used the method that note in the url, and create a test project as
follow:
project:
|-- __init__.py
|-- filter.py
|-- main_task.py
|-- target.py
and the code snippet as follow:
filter.py
from pyflink.datastream.functions import FilterFunction
# from project.target import Target --1
class MyFilterFunction(FilterFunction):
# def open(self, runtime_context):
# from project.target import Target --2
def filter(self, value):
from project.target import Target #--3
return value[-1] == Target.TRUE
target.py
import enum
class Target(enum.Enum):
TRUE = '1'
FALSE = '0'
main_task.py
env = StreamExecutionEnvironment.get_execution_environment()
# it gets the project parent dir.
env.add_python_file(
os.path.abspath(os.path.abspath(os.path.dirname(os.path.dirname(__file__)))))
env.set_python_executable(sys.executable)
kafka_consumer = KafkaConsumer()
source_ds = \
env.add_source(source_func=kafka_consumer,
source_name="kafka source",
type_info=kafka_consumer.get_type_info())
source_ds.print('source: ')
filter_ds = source_ds.filter(MyFilterFunction())
filter_ds.print('filter: ')
env.execute('test')
when I place `import ` at '1' or at '3', the console output as follow:
source: :8> +I[0, pyflink, 1, 2, 1621567628127, 1621567628127, 1, 1]
source: :8> +I[1, pyflink, 1, 2, 1621567628132, 1621567628132, 2, 1]
source: :8> +I[2, pyflink, 1, 2, 1621567628132, 1621567628132, 3, 1]
when I place import ` at '2', the console get some error as follow:
NameError: name 'Target' is not defined
then I changed fliter.py as follow:
from pyflink.datastream.functions import FilterFunction
class MyFilterFunction(FilterFunction):
def filter(self, value):
return value[-1] == "1"
the console output as follow:
source: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1]
source: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1]
source: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1]
filter: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1]
filter: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1]
filter: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1]
that's all my need.
> a problem of loading udf
> ------------------------
>
> Key: FLINK-22728
> URL: https://issues.apache.org/jira/browse/FLINK-22728
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.13.0
> Environment: python3.7
> centos 8
> pyflink1.13.0
> java1.11
> Reporter: JYXL
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> hi:
> I'm using the stream udf by python.
> udf as bellow:
>
> class MyKeySelector(KeySelector):
> def __init__(self, partitions: int=6):
> self.partitions = partitions
> def get_key(self, value):
> return random.randint(0, self.partitions)
>
> when I code it with the main task in the same script, it works,
> but when I make it in a simgle script, it cannot work.
> the archives as bellow:
>
> project:
> | __init__.py
> | key_function.py
> | main_task.py
>
> I'm confused when I use env.add_python_file method, it cannot work either,
> no matter the parameter `file_path` is '~/project' or
> '~/project/key_function.py.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)