[
https://issues.apache.org/jira/browse/BEAM-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Markus Kohler updated BEAM-14228:
---------------------------------
Description:
I am reading data from a parquet and one of the columns is a Nullable Integer
([https://pandas.pydata.org/docs/user_guide/integer_na.html#integer-na)]
Not 100% sure I correctly declared it:
{code:java}
import typing
from typing import Dict, Iterable, List, Optional
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Record(typing.NamedTuple):
port: Optional[int]
#port: str
recFields=set([i for i in Record.__dict__.keys() if i[:1] != '_'])
beam.coders.registry.register_coder(Record,beam.coders.RowCoder)
def extractDF(tuple):
df=tuple[1].to_pandas()
print(type(df.port.dtype))
return df
input_patterns = ['data/*.parquet']
#local runner
options = PipelineOptions(flags=[], type_check_additional='all')
def toRecords(df):
#df["port"]=None
return df.to_dict('records')
with beam.Pipeline(options=options) as pipeline:
lines = (pipeline | 'Create file patterns' >> beam.Create(input_patterns)
| 'Read Parquet files' >>
beam.io.ReadAllFromParquetBatched(columns=recFields,with_filename=True)
| 'Extract DF' >> beam.Map(extractDF )
| 'To dictionaries' >> beam.FlatMap(toRecords)
| 'ToRows' >> beam.Map(lambda x: Record(**x)).with_output_types(Record)
| "print">> beam.Map(print)){code}
This fails with an type error.
When I uncomment the line in toRecords to set everything to None it works fine.
was:
I reading data from a parquet and one of the columns is a Nullable Integer
([https://pandas.pydata.org/docs/user_guide/integer_na.html#integer-na)]
Not 100% sure I correctly declared it:
{code:java}
import typing
from typing import Dict, Iterable, List, Optional
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Record(typing.NamedTuple):
port: Optional[int]
#port: str
recFields=set([i for i in Record.__dict__.keys() if i[:1] != '_'])
beam.coders.registry.register_coder(Record,beam.coders.RowCoder)
def extractDF(tuple):
df=tuple[1].to_pandas()
print(type(df.port.dtype))
return df
input_patterns = ['data/*.parquet']
#local runner
options = PipelineOptions(flags=[], type_check_additional='all')
def toRecords(df):
#df["port"]=None
return df.to_dict('records')
with beam.Pipeline(options=options) as pipeline:
lines = (pipeline | 'Create file patterns' >> beam.Create(input_patterns)
| 'Read Parquet files' >>
beam.io.ReadAllFromParquetBatched(columns=recFields,with_filename=True)
| 'Extract DF' >> beam.Map(extractDF )
| 'To dictionaries' >> beam.FlatMap(toRecords)
| 'ToRows' >> beam.Map(lambda x: Record(**x)).with_output_types(Record)
| "print">> beam.Map(print)){code}
This fails with an type error.
When I uncomment the line in toRecords to set everything to None it works fine.
> Nullable Integer support in with pandas not working as expected
> ---------------------------------------------------------------
>
> Key: BEAM-14228
> URL: https://issues.apache.org/jira/browse/BEAM-14228
> Project: Beam
> Issue Type: Bug
> Components: dsl-dataframe
> Affects Versions: 2.37.0
> Reporter: Markus Kohler
> Assignee: Brian Hulette
> Priority: P2
> Labels: stale-assigned
>
> I am reading data from a parquet and one of the columns is a Nullable Integer
> ([https://pandas.pydata.org/docs/user_guide/integer_na.html#integer-na)]
> Not 100% sure I correctly declared it:
>
> {code:java}
> import typing
> from typing import Dict, Iterable, List, Optional
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> class Record(typing.NamedTuple):
> port: Optional[int]
> #port: str
> recFields=set([i for i in Record.__dict__.keys() if i[:1] != '_'])
> beam.coders.registry.register_coder(Record,beam.coders.RowCoder)
> def extractDF(tuple):
> df=tuple[1].to_pandas()
> print(type(df.port.dtype))
> return df
> input_patterns = ['data/*.parquet']
> #local runner
> options = PipelineOptions(flags=[], type_check_additional='all')
>
> def toRecords(df):
> #df["port"]=None
> return df.to_dict('records')
> with beam.Pipeline(options=options) as pipeline:
> lines = (pipeline | 'Create file patterns' >>
> beam.Create(input_patterns)
> | 'Read Parquet files' >>
> beam.io.ReadAllFromParquetBatched(columns=recFields,with_filename=True)
> | 'Extract DF' >> beam.Map(extractDF )
> | 'To dictionaries' >> beam.FlatMap(toRecords)
> | 'ToRows' >> beam.Map(lambda x: Record(**x)).with_output_types(Record)
> | "print">> beam.Map(print)){code}
> This fails with an type error.
> When I uncomment the line in toRecords to set everything to None it works
> fine.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)