jubebo commented on issue #27166:
URL: https://github.com/apache/beam/issues/27166#issuecomment-1600533589

   Awesome. I was taking a closer look at this today and implemented first 
changes.
   
   **This is what I've done**
   - suggested to rename `dict_of_tuples` to `field_names_and_types`
   - added recursive call to `generate_user_type_from_bq_schema` as suggested 
above
   _This change implies that the object `field_names_and_types` passed to 
`named_fields_to_schema` may already contain a named tuple object instead of 
plain python types. Therefore:_
   - adjusted input to `named_fields_to_schema` to prevent it from parsing all 
sequence elements which contain already processed elements
   - added new parameter to `named_tuple_from_schema` to overwrite certain 
positions in returned named tuple with already processed elements
   
   **Open questions - guidance needed**
   1. Is the PTransform implementation already aware of 'nested' row entries? 
As far as I understand, the named tuple object returned by 
`generate_user_type_from_bq_schema` will be mapped onto the existing 
PCollection within the transformations expand method.
   2. Is it desired that nested schemas will get registered multiple times in 
this setup? See
   
https://github.com/apache/beam/blob/5e942ae3790bc95148413c43ab7e43a01a2d82ae/sdks/python/apache_beam/typehints/schemas.py#L533-L542
   1. Can we drop the calls to `named_fields_to_schema` and 
`named_tuple_from_schema` in `generate_user_type_from_bq_schema` altogether and 
create the (nested) named tuple object manually? If so, we would of course lose 
the type checking functionalities build into these two methods.
   2. Unfortunately the current implementation does not yet execute without 
errors. Even when only adding the recursive call to 
`generate_user_type_from_bq_schema` the following error occurs. Can you help me 
understand the issure?
   
   Stack trace
   ```python
   ../../own_packages/apache_beam/pipeline.py:600: in __exit__
       self.result = self.run()
   ../../own_packages/apache_beam/pipeline.py:577: in run
       return self.runner.run_pipeline(self, self._options)
   ../../own_packages/apache_beam/runners/direct/direct_runner.py:129: in 
run_pipeline
       return runner.run_pipeline(pipeline, options)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:204:
 in run_pipeline
       options)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:224:
 in run_via_runner_api
       return self.run_stages(stage_context, stages)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:456:
 in run_stages
       runner_execution_context, bundle_context_manager, bundle_input)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:789:
 in _execute_bundle
       bundle_manager))
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1013:
 in _run_bundle
       data_input, data_output, input_timers, expected_timer_output)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1348:
 in process_bundle
       result_future = 
self._worker_handler.control_conn.push(process_bundle_req)
   
../../own_packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379:
 in push
       response = self.worker.do_instruction(request)
   ../../own_packages/apache_beam/runners/worker/sdk_worker.py:630: in 
do_instruction
       getattr(request, request_type), request.instruction_id)
   ../../own_packages/apache_beam/runners/worker/sdk_worker.py:667: in 
process_bundle
       bundle_processor.process_bundle(instruction_id))
   ../../own_packages/apache_beam/runners/worker/bundle_processor.py:1062: in 
process_bundle
       element.data)
   ../../own_packages/apache_beam/runners/worker/bundle_processor.py:231: in 
process_encoded
       self.output(decoded_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:528: in output
       _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
       self.consumer.process(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:1031: in process
       o)
   ../../own_packages/apache_beam/runners/common.py:1436: in 
process_with_sized_restriction
       watermark_estimator_state=estimator_state)
   ../../own_packages/apache_beam/runners/common.py:819: in invoke_process
       windowed_value, additional_args, additional_kwargs)
   ../../own_packages/apache_beam/runners/common.py:985: in 
_invoke_process_per_window
       self.threadsafe_watermark_estimator)
   ../../own_packages/apache_beam/runners/common.py:1582: in 
handle_process_outputs
       self._write_value_to_tag(tag, windowed_value, watermark_estimator)
   ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
       self.main_receivers.receive(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
       self.consumer.process(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:908: in process
       delayed_applications = self.dofn_runner.process(o)
   ../../own_packages/apache_beam/runners/common.py:1420: in process
       self._reraise_augmented(exn)
   ../../own_packages/apache_beam/runners/common.py:1492: in _reraise_augmented
       raise exn
   ../../own_packages/apache_beam/runners/common.py:1418: in process
       return self.do_fn_invoker.invoke_process(windowed_value)
   ../../own_packages/apache_beam/runners/common.py:625: in invoke_process
       windowed_value, self.process_method(windowed_value.value))
   ../../own_packages/apache_beam/runners/common.py:1582: in 
handle_process_outputs
       self._write_value_to_tag(tag, windowed_value, watermark_estimator)
   ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
       self.main_receivers.receive(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
       self.consumer.process(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:908: in process
       delayed_applications = self.dofn_runner.process(o)
   ../../own_packages/apache_beam/runners/common.py:1420: in process
       self._reraise_augmented(exn)
   ../../own_packages/apache_beam/runners/common.py:1508: in _reraise_augmented
       raise new_exn.with_traceback(tb)
   ../../own_packages/apache_beam/runners/common.py:1418: in process
       return self.do_fn_invoker.invoke_process(windowed_value)
   ../../own_packages/apache_beam/runners/common.py:625: in invoke_process
       windowed_value, self.process_method(windowed_value.value))
   ../../own_packages/apache_beam/runners/common.py:1582: in 
handle_process_outputs
       self._write_value_to_tag(tag, windowed_value, watermark_estimator)
   ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
       self.main_receivers.receive(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:239: in receive
       self.update_counters_start(windowed_value)
   ../../own_packages/apache_beam/runners/worker/operations.py:198: in 
update_counters_start
       self.opcounter.update_from(windowed_value)
   ../../own_packages/apache_beam/runners/worker/opcounters.py:213: in 
update_from
       self.do_sample(windowed_value)
   ../../own_packages/apache_beam/runners/worker/opcounters.py:265: in do_sample
       self.coder_impl.get_estimated_size_and_observables(windowed_value))
   ../../own_packages/apache_beam/coders/coder_impl.py:1507: in 
get_estimated_size_and_observables
       value.value, nested=nested))
   ../../own_packages/apache_beam/coders/coder_impl.py:209: in 
get_estimated_size_and_observables
       return self.estimate_size(value, nested), []
   ../../own_packages/apache_beam/coders/coder_impl.py:248: in estimate_size
       self.encode_to_stream(value, out, nested)
   ../../own_packages/apache_beam/coders/coder_impl.py:1769: in encode_to_stream
       component_coder.encode_to_stream(attr, out, True)
   ../../own_packages/apache_beam/coders/coder_impl.py:1735: in encode_to_stream
       attrs = [getattr(value, name) for name in self.field_names]
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   .0 = <list_iterator object at 0x7fe1f6b25610>
   
   >   attrs = [getattr(value, name) for name in self.field_names]
   E   AttributeError: 'list' object has no attribute 'nested_lv1_1' [while 
running 'QueryTable/ParDo(BeamSchemaConversionDoFn)']
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to