oyugicollins commented on issue #31855:
URL: https://github.com/apache/beam/issues/31855#issuecomment-2458726341

   Below is a section of my code 
   
   ```
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions, 
StandardOptions
   from apache_beam.io.gcp.bigquery import WriteToBigQuery
   from apache_beam.dataframe.convert import to_dataframe, to_pcollection
   import os
   import typing
   
   os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_json_file .json"
   
   class BmsSchema(typing.NamedTuple):
       ident: str
       ain_2: typing.Optional[float]
       can_data_frame_1: typing.Optional[str]
   
   beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
   
   class ParsePubSubMessage(beam.DoFn):
       def process(self, message):
           import json
           all_columns = ['ident', 'ain_2', 'can_data_frame_1']
           main_dict = dict(zip(all_columns, [None] * len(all_columns)))
           record = json.loads(message.decode('utf-8'))
           main_dict.update(record)
   
           yield {
               'ident': main_dict["ident"],
               'ain_2': main_dict["ain_2"],
               'can_data_frame_1': main_dict["can_data_frame_1"]
           }
   
   def run():
       options = PipelineOptions(
           project='dwingestion',
           runner='DataflowRunner',
           streaming=True,
           save_main_session=True
       )
   
       options.view_as(StandardOptions).streaming = True
   
       input_subscription = 'projects........../flespi_data_streaming'
   
       table_schema = {
           "fields": [
               {"name": "ident", "type": "STRING", "mode": "NULLABLE"},
               {"name": "ain_2", "type": "FLOAT", "mode": "NULLABLE"},
               {"name": "voltage_mV", "type": "INTEGER", "mode": "NULLABLE"}
           ]
       }
   
       with beam.Pipeline(options=options) as p:
           messages = (p
                       | 'Read from PubSub' >> 
beam.io.ReadFromPubSub(subscription=input_subscription)
                       | 'Parse PubSub Message' >> 
beam.ParDo(ParsePubSubMessage())
                       | 'Attach Schema' >> beam.Map(lambda x: 
BmsSchema(**x)).with_output_types(BmsSchema)
                       )
   
           # Convert the messages to a DataFrame within the pipeline context
           df = to_dataframe(messages)
           df['voltage_mV'] = 
(df['can_data_frame_1'].str[0:4].fillna(0).astype(str).apply(int, base=16)) * 
10 #here  is where the error is coming from 
           df.drop(columns=['can_data_frame_1'], inplace=True)
   
           # Convert the DataFrame back to a PCollection and map it to 
dictionaries
           transformed_pcol = (
               to_pcollection(df)
               | 'Convert to Dict' >> beam.Map(lambda row: {
                   'ident': row.ident,
                   'ain_2': row.ain_2,
                   'voltage_mV': row.voltage_mV
               })
           )
   
           transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(........
              ...........
           )
   
   if __name__ == '__main__':
       run()
   ```
   **Full Trace Error Log** 
   Traceback (most recent call last):
     File 
"C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line 
86, in <module>
       run()
     File 
"C:\Users\coyugi\Documents\watu_etls\bodawerk_etls\bms_test_schema.py", line 
69, in run
       to_pcollection(df)
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\convert.py",
 line 255, in to_pcollection
       new_results: Dict[Any, pvalue.PCollection] = {
                                                    ^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
 line 1110, in __ror__
       return self.transform.__ror__(pvalueish, self.label)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
 line 623, in __ror__
       result = p.apply(self, pvalueish, label)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py",
 line 686, in apply
       return self.apply(transform, pvalueish)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\pipeline.py",
 line 748, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 151, in expand
       return self._apply_deferred_ops(inputs, self._outputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 471, in _apply_deferred_ops
       return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
                  ^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 563, in wrapper
       cache[key] = f(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 433, in expr_to_pcoll
       return stage_to_result(expr_to_stage(expr))[expr._id]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 563, in wrapper
       cache[key] = f(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 425, in stage_to_result
       return {expr._id: expr_to_pcoll(expr)
                         ^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 563, in wrapper
       cache[key] = f(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 433, in expr_to_pcoll
       return stage_to_result(expr_to_stage(expr))[expr._id]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 563, in wrapper
       cache[key] = f(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\dataframe\transforms.py",
 line 425, in stage_to_result
       return {expr._id: expr_to_pcoll(expr)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\coyugi\AppData\Local\Programs\Python\Python312\Lib\site-packages\apache_beam\transforms\ptransform.py",
 line 602, in __ror__
       raise ValueError(
   ValueError: "[ConstantExpression[constant_int_2412180160336]]:2412182918816" 
requires a pipeline to be specified as there are no deferred inputs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to