[ 
https://issues.apache.org/jira/browse/FLINK-24003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24003:
-----------------------------------
    Labels: pull-request-available  (was: )

> Lookback mode doesn't work when mixing use of Python Table API and Python 
> DataStream API
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24003
>                 URL: https://issues.apache.org/jira/browse/FLINK-24003
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.14.0
>            Reporter: Dian Fu
>            Assignee: Huang Xingbo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> For the following program:
> {code}
> import logging
> import time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
> from pyflink.table import StreamTableEnvironment, DataTypes, Schema
> def test_chaining():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(stream_execution_environment=env)
>     
> t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled",
>  False)
>     # 1. create source Table
>     t_env.execute_sql("""
>         CREATE TABLE datagen (
>             id INT,
>             data STRING
>         ) WITH (
>             'connector' = 'datagen',
>             'rows-per-second' = '1000000',
>             'fields.id.kind' = 'sequence',
>             'fields.id.start' = '1',
>             'fields.id.end' = '1000'
>         )
>     """)
>     # 2. create sink Table
>     t_env.execute_sql("""
>         CREATE TABLE print (
>             id BIGINT,
>             data STRING,
>             flag STRING
>         ) WITH (
>             'connector' = 'blackhole'
>         )
>     """)
>     t_env.execute_sql("""
>         CREATE TABLE print_2 (
>             id BIGINT,
>             data STRING,
>             flag STRING
>         ) WITH (
>             'connector' = 'blackhole'
>         )
>     """)
>     # 3. query from source table and perform calculations
>     # create a Table from a Table API query:
>     source_table = t_env.from_path("datagen")
>     ds = t_env.to_append_stream(
>         source_table,
>         Types.ROW([Types.INT(), Types.STRING()]))
>     ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
>     ds2 = ds.map(lambda i: (i[0], i[1][2:]))
>     class MyCoMapFunction(CoMapFunction):
>         def map1(self, value):
>             print('hahah')
>             return value
>         def map2(self, value):
>             print('hahah')
>             return value
>     ds3 = ds1.connect(ds2).map(MyCoMapFunction(), 
> output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
>     ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
>                   output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
> Types.STRING()]))
>     ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
>              .map(lambda i: i,
>                   output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
> Types.STRING()]))
>     schema = Schema.new_builder() \
>         .column("f0", DataTypes.BIGINT()) \
>         .column("f1", DataTypes.STRING()) \
>         .column("f2", DataTypes.STRING()) \
>         .build()
>     result_table_3 = t_env.from_data_stream(ds4, schema)
>     statement_set = t_env.create_statement_set()
>     statement_set.add_insert("print", result_table_3)
>     result_table_4 = t_env.from_data_stream(ds5, schema)
>     statement_set.add_insert("print_2", result_table_4)
>     statement_set.execute().wait()
> if __name__ == "__main__":
>     start_ts = time.time()
>     test_chaining()
>     end_ts = time.time()
>     print("--- %s seconds ---" % (end_ts - start_ts))
> {code}
> Lookback mode doesn't work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to