jubebo commented on issue #27166: URL: https://github.com/apache/beam/issues/27166#issuecomment-1600533589
Awesome. I was taking a closer look at this today and implemented first changes. **This is what I've done** - suggested to rename `dict_of_tuples` to `field_names_and_types` - added recursive call to `generate_user_type_from_bq_schema` as suggested above _This change implies that the object `field_names_and_types` passed to `named_fields_to_schema` may already contain a named tuple object instead of plain python types. Therefore:_ - adjusted input to `named_fields_to_schema` to prevent it from parsing all sequence elements which contain already processed elements - added new parameter to `named_tuple_from_schema` to overwrite certain positions in returned named tuple with already processed elements **Open questions - guidance needed** 1. Is the PTransform implementation already aware of 'nested' row entries? As far as I understand, the named tuple object returned by `generate_user_type_from_bq_schema` will be mapped onto the existing PCollection within the transformations expand method. 2. Is it desired that nested schemas will get registered multiple times in this setup? See https://github.com/apache/beam/blob/5e942ae3790bc95148413c43ab7e43a01a2d82ae/sdks/python/apache_beam/typehints/schemas.py#L533-L542 1. Can we drop the calls to `named_fields_to_schema` and `named_tuple_from_schema` in `generate_user_type_from_bq_schema` altogether and create the (nested) named tuple object manually? If so, we would of course lose the type checking functionalities build into these two methods. 2. Unfortunately the current implementation does not yet execute without errors. Even when only adding the recursive call to `generate_user_type_from_bq_schema` the following error occurs. Can you help me understand the issure? Stack trace ```python ../../own_packages/apache_beam/pipeline.py:600: in __exit__ self.result = self.run() ../../own_packages/apache_beam/pipeline.py:577: in run return self.runner.run_pipeline(self, self._options) ../../own_packages/apache_beam/runners/direct/direct_runner.py:129: in run_pipeline return runner.run_pipeline(pipeline, options) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:204: in run_pipeline options) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:224: in run_via_runner_api return self.run_stages(stage_context, stages) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:456: in run_stages runner_execution_context, bundle_context_manager, bundle_input) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:789: in _execute_bundle bundle_manager)) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1013: in _run_bundle data_input, data_output, input_timers, expected_timer_output) ../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1348: in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) ../../own_packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push response = self.worker.do_instruction(request) ../../own_packages/apache_beam/runners/worker/sdk_worker.py:630: in do_instruction getattr(request, request_type), request.instruction_id) ../../own_packages/apache_beam/runners/worker/sdk_worker.py:667: in process_bundle bundle_processor.process_bundle(instruction_id)) ../../own_packages/apache_beam/runners/worker/bundle_processor.py:1062: in process_bundle element.data) ../../own_packages/apache_beam/runners/worker/bundle_processor.py:231: in process_encoded self.output(decoded_value) ../../own_packages/apache_beam/runners/worker/operations.py:528: in output _cast_to_receiver(self.receivers[output_index]).receive(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive self.consumer.process(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:1031: in process o) ../../own_packages/apache_beam/runners/common.py:1436: in process_with_sized_restriction watermark_estimator_state=estimator_state) ../../own_packages/apache_beam/runners/common.py:819: in invoke_process windowed_value, additional_args, additional_kwargs) ../../own_packages/apache_beam/runners/common.py:985: in _invoke_process_per_window self.threadsafe_watermark_estimator) ../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs self._write_value_to_tag(tag, windowed_value, watermark_estimator) ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag self.main_receivers.receive(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive self.consumer.process(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:908: in process delayed_applications = self.dofn_runner.process(o) ../../own_packages/apache_beam/runners/common.py:1420: in process self._reraise_augmented(exn) ../../own_packages/apache_beam/runners/common.py:1492: in _reraise_augmented raise exn ../../own_packages/apache_beam/runners/common.py:1418: in process return self.do_fn_invoker.invoke_process(windowed_value) ../../own_packages/apache_beam/runners/common.py:625: in invoke_process windowed_value, self.process_method(windowed_value.value)) ../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs self._write_value_to_tag(tag, windowed_value, watermark_estimator) ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag self.main_receivers.receive(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:240: in receive self.consumer.process(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:908: in process delayed_applications = self.dofn_runner.process(o) ../../own_packages/apache_beam/runners/common.py:1420: in process self._reraise_augmented(exn) ../../own_packages/apache_beam/runners/common.py:1508: in _reraise_augmented raise new_exn.with_traceback(tb) ../../own_packages/apache_beam/runners/common.py:1418: in process return self.do_fn_invoker.invoke_process(windowed_value) ../../own_packages/apache_beam/runners/common.py:625: in invoke_process windowed_value, self.process_method(windowed_value.value)) ../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs self._write_value_to_tag(tag, windowed_value, watermark_estimator) ../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag self.main_receivers.receive(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:239: in receive self.update_counters_start(windowed_value) ../../own_packages/apache_beam/runners/worker/operations.py:198: in update_counters_start self.opcounter.update_from(windowed_value) ../../own_packages/apache_beam/runners/worker/opcounters.py:213: in update_from self.do_sample(windowed_value) ../../own_packages/apache_beam/runners/worker/opcounters.py:265: in do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) ../../own_packages/apache_beam/coders/coder_impl.py:1507: in get_estimated_size_and_observables value.value, nested=nested)) ../../own_packages/apache_beam/coders/coder_impl.py:209: in get_estimated_size_and_observables return self.estimate_size(value, nested), [] ../../own_packages/apache_beam/coders/coder_impl.py:248: in estimate_size self.encode_to_stream(value, out, nested) ../../own_packages/apache_beam/coders/coder_impl.py:1769: in encode_to_stream component_coder.encode_to_stream(attr, out, True) ../../own_packages/apache_beam/coders/coder_impl.py:1735: in encode_to_stream attrs = [getattr(value, name) for name in self.field_names] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .0 = <list_iterator object at 0x7fe1f6b25610> > attrs = [getattr(value, name) for name in self.field_names] E AttributeError: 'list' object has no attribute 'nested_lv1_1' [while running 'QueryTable/ParDo(BeamSchemaConversionDoFn)'] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
