[ 
https://issues.apache.org/jira/browse/FLINK-39724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Feng updated FLINK-39724:
--------------------------------
    Description: 
See good.py and bad.py attached. 

The only difference is good.py puts the stream through the identity map. This 
is highly counter-intuitive and seems to be undocumented. In v2.2 documentation:
{code:python}
schema = CsvSchema.builder() \
    .add_number_column('id', number_type=DataTypes.BIGINT()) \
    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
    .set_column_separator(',') \
    .build()

source = FileSource.for_record_stream_format(
    CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()

# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), 
Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source') 
{code}
 

Here's bad.py for convenience:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.table.types import DataTypes
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import WatermarkStrategy

env = StreamExecutionEnvironment.get_execution_environment()

input_path = "./input.csv"

schema = CsvSchema.builder() \
    .add_string_column("type") \
    .add_number_column("value", number_type=DataTypes.DOUBLE()) \
    .set_column_separator(',') \
    .build()

file_source = (
    FileSource
    .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
    .build()
)

ds = env.from_source(
    source=file_source,
    watermark_strategy=WatermarkStrategy.no_watermarks(),
    source_name="csv_source",
)

ds.get_type() # this fails

#  Traceback (most recent call last):
#   File "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/./main.py", 
line 29, in <module>
#     ds.get_type() # this fails
#     ^^^^^^^^^^^^^
#   File 
"/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/datastream/data_stream.py",
 line 169, in get_type
#     return typeinfo._from_java_type(self._j_data_stream.getType())
#            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#   File 
"/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/common/typeinfo.py",
 line 1118, in _from_java_type
#     raise TypeError("The java type info: %s is not supported in PyFlink 
currently." % j_type_info)
# TypeError: The java type info: ROW<`type` STRING, `value` 
DOUBLE>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer) is not supported in 
PyFlink currently.

# comment: assign_timestamps_and_watermarks (data_stream.py:692) calls 
get_type, so calling ds.assign_timestamps_and_watermarks will trigger the same 
problem.

ds.print()

env.execute()

 {code}

  was:
See good.py and bad.py attached. 

The only difference is good.py puts the stream through the identity map. This 
is highly counter-intuitive and seems to be undocumented. In v2.2 documentation:
{code:python}
schema = CsvSchema.builder() \
    .add_number_column('id', number_type=DataTypes.BIGINT()) \
    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
    .set_column_separator(',') \
    .build()

source = FileSource.for_record_stream_format(
    CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()

# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), 
Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source') 
{code}


> CsvReaderFormat produces a stream which fails get_type() and 
> assign_timestamps_and_watermarks()
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39724
>                 URL: https://issues.apache.org/jira/browse/FLINK-39724
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 2.2.0
>            Reporter: Andrew Feng
>            Priority: Major
>         Attachments: CsvFormatReader.tar.gz
>
>
> See good.py and bad.py attached. 
> The only difference is good.py puts the stream through the identity map. This 
> is highly counter-intuitive and seems to be undocumented. In v2.2 
> documentation:
> {code:python}
> schema = CsvSchema.builder() \
>     .add_number_column('id', number_type=DataTypes.BIGINT()) \
>     .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
>     .set_column_separator(',') \
>     .build()
> source = FileSource.for_record_stream_format(
>     CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
> # the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), 
> Types.LIST(Types.INT())])
> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source') 
> {code}
>  
> Here's bad.py for convenience:
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.table.types import DataTypes
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> schema = CsvSchema.builder() \
>     .add_string_column("type") \
>     .add_number_column("value", number_type=DataTypes.DOUBLE()) \
>     .set_column_separator(',') \
>     .build()
> file_source = (
>     FileSource
>     .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
>     .build()
> )
> ds = env.from_source(
>     source=file_source,
>     watermark_strategy=WatermarkStrategy.no_watermarks(),
>     source_name="csv_source",
> )
> ds.get_type() # this fails
> #  Traceback (most recent call last):
> #   File "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/./main.py", 
> line 29, in <module>
> #     ds.get_type() # this fails
> #     ^^^^^^^^^^^^^
> #   File 
> "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/datastream/data_stream.py",
>  line 169, in get_type
> #     return typeinfo._from_java_type(self._j_data_stream.getType())
> #            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> #   File 
> "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/common/typeinfo.py",
>  line 1118, in _from_java_type
> #     raise TypeError("The java type info: %s is not supported in PyFlink 
> currently." % j_type_info)
> # TypeError: The java type info: ROW<`type` STRING, `value` 
> DOUBLE>(org.apache.flink.table.data.RowData, 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer) is not supported 
> in PyFlink currently.
> # comment: assign_timestamps_and_watermarks (data_stream.py:692) calls 
> get_type, so calling ds.assign_timestamps_and_watermarks will trigger the 
> same problem.
> ds.print()
> env.execute()
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to