[
https://issues.apache.org/jira/browse/FLINK-23768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu closed FLINK-23768.
---------------------------
Resolution: Done
Test it with the following job:
{code}
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (DataTypes, TableDescriptor, Schema,
StreamTableEnvironment,
EnvironmentSettings)
from pyflink.table.udf import udf
def mixing_use_of_datastream_and_table():
# use StreamTableEnvironment instead of TableEnvironment when mixing use of
table & datastream
env = StreamExecutionEnvironment.get_execution_environment()
setting = EnvironmentSettings.new_instance().in_batch_mode().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=env,
environment_settings=setting)
# define the source
t_env.create_temporary_table(
'source',
TableDescriptor.for_connector('datagen')
.schema(Schema.new_builder()
.column('id', DataTypes.BIGINT())
.column('data', DataTypes.STRING())
.build())
.option("number-of-rows", "10")
.build())
# define the sink
t_env.create_temporary_table(
'sink',
TableDescriptor.for_connector('print')
.schema(Schema.new_builder()
.column('a', DataTypes.BIGINT())
.build())
.build())
@udf(result_type=DataTypes.BIGINT())
def length(data):
return len(data)
# perform table api operations
table = t_env.from_path("source")
table = table.select(table.id, length(table.data))
# convert table to datastream and perform datastream api operations
ds = t_env.to_data_stream(table)
ds = ds.map(lambda i: i[0] + i[1], output_type=Types.LONG())
# convert datastream to table and perform table api operations as you want
table = t_env.from_data_stream(
ds,
Schema.new_builder().column("f0", DataTypes.BIGINT()).build())
# execute
table.execute_insert('sink') \
.wait()
# remove .wait if submitting to a remote cluster, refer to
#
https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
# for more details
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
mixing_use_of_datastream_and_table()
{code}
Found an issue: FLINK-24097. After resolving this issue, everything works well.
> Test StreamTableEnvironment batch mode in Python
> ------------------------------------------------
>
> Key: FLINK-23768
> URL: https://issues.apache.org/jira/browse/FLINK-23768
> Project: Flink
> Issue Type: Sub-task
> Components: API / Python, Table SQL / API
> Reporter: Timo Walther
> Assignee: Dian Fu
> Priority: Major
>
> FLINK-20897 enabled batch mode for {{StreamTableEnvironment}}. We should make
> sure that the Python API works as well.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)