oyugicollins commented on issue #31855:
URL: https://github.com/apache/beam/issues/31855#issuecomment-2458726341
Below is a section of my code
```
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
import os
import typing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_json_file .json"
class BmsSchema(typing.NamedTuple):
ident: str
ain_2: typing.Optional[float]
can_data_frame_1: typing.Optional[str]
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
all_columns = ['ident', 'ain_2', 'can_data_frame_1']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
'ident': main_dict["ident"],
'ain_2': main_dict["ain_2"],
'can_data_frame_1': main_dict["can_data_frame_1"]
}
def run():
options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
save_main_session=True
)
options.view_as(StandardOptions).streaming = True
input_subscription = 'projects........../flespi_data_streaming'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"},
{"name": "ain_2", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "voltage_mV", "type": "INTEGER", "mode": "NULLABLE"}
]
}
with beam.Pipeline(options=options) as p:
messages = (p
| 'Read from PubSub' >>
beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >>
beam.ParDo(ParsePubSubMessage())
| 'Attach Schema' >> beam.Map(lambda x:
BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to a DataFrame within the pipeline context
df = to_dataframe(messages)
df['voltage_mV'] =
(df['can_data_frame_1'].str[0:4].fillna(0).astype(str).apply(int, base=16)) *
10 #here is where the error is coming from
df.drop(columns=['can_data_frame_1'], inplace=True)
# Convert the DataFrame back to a PCollection and map it to
dictionaries
transformed_pcol = (
to_pcollection(df)
| 'Convert to Dict' >> beam.Map(lambda row: {
'ident': row.ident,
'ain_2': row.ain_2,
'voltage_mV': row.voltage_mV
})
)
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(........
...........
)
if __name__ == '__main__':
run()
```
**Full Trace Error Log**
Traceback (most recent call last):
File
"C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line
86, in <module>
run()
File
"C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line
69, in run
to_pcollection(df)
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\convert.py",
line 255, in to_pcollection
new_results: Dict[Any, pvalue.PCollection] = {
^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
line 1110, in __ror__
return self.transform.__ror__(pvalueish, self.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
line 623, in __ror__
result = p.apply(self, pvalueish, label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py",
line 686, in apply
return self.apply(transform, pvalueish)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py",
line 748, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 151, in expand
return self._apply_deferred_ops(inputs, self._outputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 471, in _apply_deferred_ops
return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 433, in expr_to_pcoll
return stage_to_result(expr_to_stage(expr))[expr._id]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 425, in stage_to_result
return {expr._id: expr_to_pcoll(expr)
^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 433, in expr_to_pcoll
return stage_to_result(expr_to_stage(expr))[expr._id]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 563, in wrapper
cache[key] = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
line 425, in stage_to_result
return {expr._id: expr_to_pcoll(expr)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
line 602, in __ror__
raise ValueError(
ValueError: "[ConstantExpression[constant_int_2412180160336]]:2412182918816"
requires a pipeline to be specified as there are no deferred inputs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]