ffernandez92 commented on issue #28664:
URL: https://github.com/apache/beam/issues/28664#issuecomment-1743609326

   Thanks for the KafkaProvider @robertwb . I ran a quick test adding the 
necessary elements to `standard_io.yaml` 
   
   ```
     transforms:
   (...)
       'ReadFromKafka': 'ReadFromKafka'
   
         'ReadFromKafka':
           'schema': 'schema'
           'hash_code': 'hashCode'
           'consumer_config': 'consumerConfigUpdates'
           'format': 'format'
           'topic': 'topic'
           'bootstrap_servers': 'bootstrapServers'
           'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
   (...)
   
           'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1'
   ```
   
   As a test. I ran the following yaml: 
   
   ```
   pipeline:
     transforms:
       - type: ReadFromKafka
         config:
           topic: test_topic
           format: JSON
           hash_code: 1
           bootstrap_servers: "localhost:9092"
           schema: '{ "type": "record", "name": "AdEvent", "fields": [{"name": 
"media_type", "type": "string"},{"name": "ad_type", "type": "int"}]}'
   ```
   
   But it shows the following error:
   ```
   Building pipeline...
   INFO:apache_beam.yaml.yaml_transform:Expanding "ReadFromKafka" at line 3 
   Traceback (most recent call last):
     File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
       return _run_code(code, main_globals, None,
     File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
       exec(code, run_globals)
     File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py", 
line 75, in <module>
       run()
     File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py", 
line 68, in run
       yaml_transform.expand_pipeline(p, pipeline_spec)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 970, in expand_pipeline
       return YamlTransform(
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 944, in expand
       result = expand_transform(
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 424, in expand_transform
       return expand_composite_transform(spec, scope)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 498, in expand_composite_transform
       return CompositePTransform.expand(None)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 489, in expand
       inner_scope.compute_all()
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 218, in compute_all
       self.compute_outputs(transform_id)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 68, in wrapper
       self._cache[key] = func(self, *args)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 252, in compute_outputs
       return expand_transform(self._transforms_by_uuid[transform_id], self)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 426, in expand_transform
       return expand_leaf_transform(spec, scope)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 448, in expand_leaf_transform
       ptransform = scope.create_ptransform(spec, inputs_dict.values())
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 351, in create_ptransform
       provider.requires_inputs(spec['type'], config)):
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py", 
line 719, in requires_inputs
       return self._underlying_provider.requires_inputs(typ, args)
     File 
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py", 
line 172, in requires_inputs
       if self._urns[type] in self.schema_transforms():
   KeyError: <class 'type'>
   
   ``` 
   Most likely I'm missing something. When I registered the transform as "full 
python" under `ReadFromKafka': 'apache_beam.yaml.yaml_io.read_from_kafka'`  it 
worked fine although it was limited to raw only. The only reason why I tried 
this is because I wanted to explore to expand the Java option [which I think is 
the right way to go]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to