jrmccluskey commented on code in PR #36211:
URL: https://github.com/apache/beam/pull/36211#discussion_r2495628480


##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -110,22 +110,19 @@
 
 ReadFromKafkaSchema = typing.NamedTuple(
     'ReadFromKafkaSchema',
-    [
-        ('consumer_config', typing.Mapping[str, str]),
-        ('topics', typing.List[str]),
-        ('key_deserializer', str),
-        ('value_deserializer', str),
-        ('start_read_time', typing.Optional[int]),
-        ('max_num_records', typing.Optional[int]),
-        ('max_read_time', typing.Optional[int]),
-        ('commit_offset_in_finalize', bool),
-        ('timestamp_policy', str),
-        ('consumer_polling_timeout', typing.Optional[int]),
-        ('redistribute', typing.Optional[bool]),
-        ('redistribute_num_keys', typing.Optional[np.int32]),
-        ('allow_duplicates', typing.Optional[bool]),
-        ('dynamic_read_poll_interval_seconds', typing.Optional[int]),
-    ])
+    [('consumer_config', typing.Mapping[str, str]),
+     ('topics', typing.List[str]), ('key_deserializer', str),
+     ('value_deserializer', str), ('start_read_time', typing.Optional[int]),
+     ('max_num_records', typing.Optional[int]),
+     ('max_read_time', typing.Optional[int]),
+     ('commit_offset_in_finalize', bool), ('timestamp_policy', str),
+     ('consumer_polling_timeout', typing.Optional[int]),
+     ('redistribute', typing.Optional[bool]),
+     ('redistribute_num_keys', typing.Optional[np.int32]),
+     ('allow_duplicates', typing.Optional[bool]),
+     ('dynamic_read_poll_interval_seconds', typing.Optional[int]),
+     ('consumer_factory_fn_class', typing.Optional[str]),
+     ('consumer_factory_fn_params', typing.Optional[typing.Mapping[str, 
str]])])

Review Comment:
   Use `collections.abc.Mapping` here, `typing.Mapping` is deprecated



##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -216,6 +214,13 @@ def __init__(
     :param dynamic_read_poll_interval_seconds: The interval in seconds at which
         to check for new partitions. If not None, dynamic partition discovery
         is enabled.
+    :param consumer_factory_fn_class: A fully qualified classpath to an
+        existing provided consumerFactoryFn. If not None, this will construct
+        Kafka consumers with a custom configuration.
+    :param consumer_factory_fn_params: A map which specifies the parameters for
+        the provided consumer_factory_fn_class. IF not None, the values in this

Review Comment:
   ```suggestion
           the provided consumer_factory_fn_class. If not None, the values in 
this
   ```



##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -173,7 +170,8 @@ def __init__(
       redistribute_num_keys=np.int32(0),
       allow_duplicates=False,
       dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
-  ):
+      consumer_factory_fn_class=None,
+      consumer_factory_fn_params=None):

Review Comment:
   Consider typing these args



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to