ffernandez92 commented on issue #28664:
URL: https://github.com/apache/beam/issues/28664#issuecomment-1743609326
Thanks for the KafkaProvider @robertwb . I ran a quick test adding the
necessary elements to `standard_io.yaml`
```
transforms:
(...)
'ReadFromKafka': 'ReadFromKafka'
'ReadFromKafka':
'schema': 'schema'
'hash_code': 'hashCode'
'consumer_config': 'consumerConfigUpdates'
'format': 'format'
'topic': 'topic'
'bootstrap_servers': 'bootstrapServers'
'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
(...)
'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1'
```
As a test. I ran the following yaml:
```
pipeline:
transforms:
- type: ReadFromKafka
config:
topic: test_topic
format: JSON
hash_code: 1
bootstrap_servers: "localhost:9092"
schema: '{ "type": "record", "name": "AdEvent", "fields": [{"name":
"media_type", "type": "string"},{"name": "ad_type", "type": "int"}]}'
```
But it shows the following error:
```
Building pipeline...
INFO:apache_beam.yaml.yaml_transform:Expanding "ReadFromKafka" at line 3
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py",
line 75, in <module>
run()
File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py",
line 68, in run
yaml_transform.expand_pipeline(p, pipeline_spec)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 970, in expand_pipeline
return YamlTransform(
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 944, in expand
result = expand_transform(
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 424, in expand_transform
return expand_composite_transform(spec, scope)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 498, in expand_composite_transform
return CompositePTransform.expand(None)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 489, in expand
inner_scope.compute_all()
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 218, in compute_all
self.compute_outputs(transform_id)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 68, in wrapper
self._cache[key] = func(self, *args)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 252, in compute_outputs
return expand_transform(self._transforms_by_uuid[transform_id], self)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 426, in expand_transform
return expand_leaf_transform(spec, scope)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 448, in expand_leaf_transform
ptransform = scope.create_ptransform(spec, inputs_dict.values())
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 351, in create_ptransform
provider.requires_inputs(spec['type'], config)):
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py",
line 719, in requires_inputs
return self._underlying_provider.requires_inputs(typ, args)
File
"/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py",
line 172, in requires_inputs
if self._urns[type] in self.schema_transforms():
KeyError: <class 'type'>
```
Most likely I'm missing something. When I registered the transform as "full
python" under `ReadFromKafka': 'apache_beam.yaml.yaml_io.read_from_kafka'` it
worked fine although it was limited to raw only. The only reason why I tried
this is because I wanted to explore to expand the Java option [which I think is
the right way to go]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]