jrmccluskey commented on code in PR #36211:
URL: https://github.com/apache/beam/pull/36211#discussion_r2495628480
##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -110,22 +110,19 @@
ReadFromKafkaSchema = typing.NamedTuple(
'ReadFromKafkaSchema',
- [
- ('consumer_config', typing.Mapping[str, str]),
- ('topics', typing.List[str]),
- ('key_deserializer', str),
- ('value_deserializer', str),
- ('start_read_time', typing.Optional[int]),
- ('max_num_records', typing.Optional[int]),
- ('max_read_time', typing.Optional[int]),
- ('commit_offset_in_finalize', bool),
- ('timestamp_policy', str),
- ('consumer_polling_timeout', typing.Optional[int]),
- ('redistribute', typing.Optional[bool]),
- ('redistribute_num_keys', typing.Optional[np.int32]),
- ('allow_duplicates', typing.Optional[bool]),
- ('dynamic_read_poll_interval_seconds', typing.Optional[int]),
- ])
+ [('consumer_config', typing.Mapping[str, str]),
+ ('topics', typing.List[str]), ('key_deserializer', str),
+ ('value_deserializer', str), ('start_read_time', typing.Optional[int]),
+ ('max_num_records', typing.Optional[int]),
+ ('max_read_time', typing.Optional[int]),
+ ('commit_offset_in_finalize', bool), ('timestamp_policy', str),
+ ('consumer_polling_timeout', typing.Optional[int]),
+ ('redistribute', typing.Optional[bool]),
+ ('redistribute_num_keys', typing.Optional[np.int32]),
+ ('allow_duplicates', typing.Optional[bool]),
+ ('dynamic_read_poll_interval_seconds', typing.Optional[int]),
+ ('consumer_factory_fn_class', typing.Optional[str]),
+ ('consumer_factory_fn_params', typing.Optional[typing.Mapping[str,
str]])])
Review Comment:
Use `collections.abc.Mapping` here, `typing.Mapping` is deprecated
##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -216,6 +214,13 @@ def __init__(
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
to check for new partitions. If not None, dynamic partition discovery
is enabled.
+ :param consumer_factory_fn_class: A fully qualified classpath to an
+ existing provided consumerFactoryFn. If not None, this will construct
+ Kafka consumers with a custom configuration.
+ :param consumer_factory_fn_params: A map which specifies the parameters for
+ the provided consumer_factory_fn_class. IF not None, the values in this
Review Comment:
```suggestion
the provided consumer_factory_fn_class. If not None, the values in
this
```
##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -173,7 +170,8 @@ def __init__(
redistribute_num_keys=np.int32(0),
allow_duplicates=False,
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
- ):
+ consumer_factory_fn_class=None,
+ consumer_factory_fn_params=None):
Review Comment:
Consider typing these args
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]