# problem_2.py
# .alias() does not work either
import json
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
table = t_env.from_elements(
elements=[
(1, '{"name": "Flink"}'),
(2, '{"name": "hello"}'),
(3, '{"name": "world"}'),
(4, '{"name": "PyFlink"}')
],
schema=['id', 'data'],
).alias('id', 'data')
@udf(
result_type=(
'Row<id INT, name STRING>'
),
)
def example_map(row: Row):
print('\n'*3, f'{row=}', '\n'*3)
# will print:
# row=Row(f0=1, f1='{"name": "Flink"}')
# expected:
# row=Row(id=1, data='{"name": "Flink"}')
data = json.loads(row.data)
return Row(row.id, data['name'])
# Will raise with
# ValueError: 'data' is not in list
flow = (
table
.map(example_map)
.execute().print()
)
On Fri, Oct 27, 2023 at 2:14 PM Alexey Sergeev <[email protected]> wrote:
>
> Hi everyone,
>
>
> Python Table API seems to be a little bit buggy.
>
> Some minimal examples of strange behaviors here:
>
> https://gist.github.com/nrdhm/88322a68fc3e9a14a5f4ab6ec13403cf
>
>
>
> Was testing in pyflink-shell in our small cluster with Flink 1.17.
>
> Docker image: flink:1.17.1-scala_2.12-java11
>
>
>
> The third problem with pandas UDF concerns me the most.
>
>
>
> It seems like Vectorized UDF do not work at all with .filter() /.where()
> calls.
>
> Columns name are reset to default f0, f1, …, fN and values are not being
> filtered.
>
>
> And so, I have some questions:
>
> 1. Was you able to reproduce these problems?
> 2. Is it the expected behavior?
> 3. How can we get around this?
>
> Best regards, Alexey