josh-fell commented on code in PR #30175:
URL: https://github.com/apache/airflow/pull/30175#discussion_r1140876165


##########
airflow/providers/apache/kafka/hooks/consumer.py:
##########
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka import Consumer
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook
+
+    A hook for creating a Kafka Consumer
+    :param config: A config dictionary to use with confluent_kafka library, 
defaults to None
+    """
+
+
+    def __init__(
+        self,
+        topics: Sequence[str],
+        config: dict[Any, Any] | None = None,
+    ) -> None:
+        super().__init__()
+
+        self.config: dict[Any, Any] = config or {}
+        self.topics = topics
+
+        if not self.config.get("group.id", None):
+            raise AirflowException(
+                "The 'group.id' parameter must be set in the config 
dictionary'. Got <None>"
+            )
+
+        if not self.config.get("bootstrap.servers", None):
+            raise AirflowException("config['bootsrap.servers'] must be 
provided.")

Review Comment:
   Should `group.id`  and `bootstrap.servers` be their own explicit params for 
this hook?



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",

Review Comment:
   ```suggestion
           commit_cadence: str = "end_of_operator",
   ```



##########
airflow/providers/apache/kafka/hooks/consumer.py:
##########
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka import Consumer
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook
+
+    A hook for creating a Kafka Consumer
+    :param config: A config dictionary to use with confluent_kafka library, 
defaults to None

Review Comment:
   ```suggestion
       A hook for creating a Kafka Consumer
   
       :param config: A config dictionary to use with confluent_kafka library, 
defaults to None
   ```
   The blank line is needed between the description and parameter directives so 
the parameters are properly rendered in the Python API docs. This comment 
applies to other instances in the provider as well.



##########
airflow/providers/apache/kafka/hooks/admin_client.py:
##########
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka.admin import AdminClient, NewTopic
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaAdminClientHook(BaseHook):
+    """KafkaAdminClientHook
+
+    A hook for interacting with the Kafka Cluster

Review Comment:
   ```suggestion
       """
       A hook for interacting with the Kafka Cluster
   ```
   Having the hook name at the beginning of the docstring seems redundant if 
reading in the Python API docs. Same comment applies to other 
operators/hooks/triggers/methods/functions in this provider too.



##########
airflow/providers/apache/kafka/hooks/admin_client.py:
##########
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka.admin import AdminClient, NewTopic
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaAdminClientHook(BaseHook):
+    """KafkaAdminClientHook
+
+    A hook for interacting with the Kafka Cluster
+
+    :param config: A config dictionary to use with confluent_kafka library, 
defaults to None
+    """
+
+    def __init__(
+        self,
+        config: dict[Any, Any] | None = None,
+    ) -> None:
+        super().__init__()
+
+        self.config: dict[Any, Any] = config or {}
+
+        if not (self.config.get("bootstrap.servers", None)):
+            raise AirflowException("config['bootsrap.servers'] must be 
provided.")

Review Comment:
   WDYT about breaking out `bootstrap_servers` into its own parameter since 
it's required configuration? Then the `config` param can include _optional_ 
configuration for the underlying lib.



##########
airflow/providers/apache/kafka/operators/await_message.py:
##########
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class AwaitKafkaMessageOperator(BaseOperator):
+    """AwaitKafkaMessageOperator An Airflow operator that defers until a 
specific message is published to Kafka.
+
+    The operator creates a consumer that reads the Kafka log until it 
encounters a positive event.
+
+    The behavior of the consumer for this trigger is as follows:
+    - poll the Kafka topics for a message
+    - if no message returned, sleep
+    - process the message with provided callable and commit the message offset
+    - if callable returns any data, raise a TriggerEvent with the return data
+    - else continue to next message
+    - return event (as default xcom or specific xcom key)
+
+    :param topics: Topics (or topic regex) to use for reading from
+    :param apply_function: The functoin to apply to messages to determine if 
an event occurred. As a dot
+    notation string.

Review Comment:
   ```suggestion
       :param apply_function: The functoin to apply to messages to determine if 
an event occurred. As a dot
           notation string.
   ```
   I forget how picky Sphinx is, but IIRC if the param description goes beyond 
the first line, each subsequent line needs to be indented. You can [build the 
docs locally with 
Breeze](https://github.com/apache/airflow/blob/main/BREEZE.rst#building-the-documentation)
 and confirm.



##########
airflow/providers/apache/kafka/hooks/producer.py:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from confluent_kafka import Producer
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaProducerHook(BaseHook):
+    """KafkaProducerHook
+
+    A hook for creating a Kafka Producer
+    :param config: A config dictionary to use with confluent_kafka library, 
defaults to None
+    """
+
+    def __init__(
+        self,
+        config: dict[Any, Any] | None = None,
+        no_broker: bool = False,
+    ) -> None:
+        super().__init__()
+
+        self.config: dict[Any, Any] = config or {}
+
+        if not self.config.get("bootstrap.servers", None):

Review Comment:
   Same comment here re: explicit parameter for a required configuration.



##########
airflow/providers/apache/kafka/hooks/consumer.py:
##########
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, Sequence
+
+from confluent_kafka import Consumer
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class KafkaConsumerHook(BaseHook):
+    """KafkaConsumerHook
+
+    A hook for creating a Kafka Consumer
+    :param config: A config dictionary to use with confluent_kafka library, 
defaults to None
+    """
+
+
+    def __init__(
+        self,
+        topics: Sequence[str],
+        config: dict[Any, Any] | None = None,
+    ) -> None:
+        super().__init__()
+
+        self.config: dict[Any, Any] = config or {}
+        self.topics = topics
+
+        if not self.config.get("group.id", None):
+            raise AirflowException(
+                "The 'group.id' parameter must be set in the config 
dictionary'. Got <None>"
+            )
+
+        if not self.config.get("bootstrap.servers", None):
+            raise AirflowException("config['bootsrap.servers'] must be 
provided.")
+
+    def get_consumer(self) -> Consumer:

Review Comment:
   WDYT about splitting this method into two:
   1. `get_consumer()` which retrieves the `Consumer` object
   2. `subscribe_to_topics()` which uses the `Consumer` object and subscribes 
to an input set of topics?
   
   The subscribe functionality could be used atomically if folks wanted to use 
it and also use the `Consumer` object via `get_consumer()` for other actions 
which might not have an explicit hook method in this provider.



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(

Review Comment:
   WDYT about raising a `ValueError` instead? 



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True

Review Comment:
   Why does `max_messages` default to `True` if int types are the expected 
arguments?
   
   Maybe there should be an explicit parameter which is the equivalent of 
`max_messages==True` like `fetch_all_messages`? Gives users the option to 
either specify a maximum number of messages to consume or consume all possible 
messages from the topic.



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None

Review Comment:
   ```suggestion
           self.apply_function = apply_function
           self.apply_function_batch = apply_function_batch
   ```
   Since `None` is the default.



##########
airflow/providers/apache/kafka/provider.yaml:
##########


Review Comment:
   Should use Apache Kafka rather than Kafka throughout this file.



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+        if self.max_messages and self.max_batch_size > self.max_messages:
+            self.log.warning(
+                f"max_batch_size ({self.max_batch_size}) > max_messages"
+                f" ({self.max_messages}). Setting max_messages to"
+                f" {self.max_batch_size}"
+            )

Review Comment:
   Let's avoid using f-string interpolation in logging statements and use 
%-formatting instead.



##########
airflow/providers/apache/kafka/provider.yaml:
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-kafka
+name: Kafka
+
+description: |
+  `Kafka  <https://kafka.apache.org/>`__
+versions:
+  - 0.3.0
+
+dependencies:
+  - apache-airflow>=2.2.0

Review Comment:
   All providers now have a minimum Airflow version of 2.3.0.
   ```suggestion
     - apache-airflow>=2.3.0
   ```



##########
tests/providers/apache/kafka/hooks/test_admin_client.py:
##########
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import unittest
+
+import pytest
+
+from airflow import AirflowException
+
+# Import Hook
+from airflow.providers.apache.kafka.hooks.admin_client import 
KafkaAdminClientHook
+
+log = logging.getLogger(__name__)
+
+
+class TestSampleHook(unittest.TestCase):

Review Comment:
   Net-new tests should use `pytest` instead.



##########
airflow/providers/apache/kafka/operators/produce_to_topic.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.producer import KafkaProducerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+local_logger = logging.getLogger("airflow")
+
+
+def acked(err, msg):
+    if err is not None:
+        local_logger.error(f"Failed to deliver message: {err}")
+    else:
+        local_logger.info(
+            f"Produced record to topic {msg.topic()} partition 
[{msg.partition()}] @ offset {msg.offset()}"
+        )
+
+
+class ProduceToTopicOperator(BaseOperator):
+    """ProduceToTopicOperator An operator that produces messages to a Kafka 
topic
+
+    Registers a producer to a kafka topic and publishes messages to the log.
+
+
+    :param topic: The topic the producer should produce to, defaults to None
+    :param producer_function: The function that generates key/value pairs as 
messages for production, defaults to None
+    :param producer_function_args: Additional arguments to be applied to the 
producer callable, defaults to None
+    :param producer_function_kwargs: Additional keyword arguments to be 
applied to the producer callable,
+        defaults to None
+    :param delivery_callback: The callback to apply after delivery(or failure) 
of a message, defaults to None
+    :param synchronous: If writes to kafka should be fully synchronous, 
defaults to True
+    :param kafka_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None
+    :param poll_timeout: How long of a delay should be applied when calling 
poll after production to kafka,
+         defaults to 0
+    :raises AirflowException: _description_
+    """
+
+    template_fields = (
+        "topic",
+        "producer_function",
+        "producer_function_args",
+        "producer_function_kwargs",
+        "kafka_config",
+    )
+
+    def __init__(
+        self,
+        topic: str,
+        producer_function: str | Callable[..., Any],
+        producer_function_args: Sequence[Any] | None = None,
+        producer_function_kwargs: dict[Any, Any] | None = None,
+        delivery_callback: str | None = None,
+        synchronous: bool | None = True,

Review Comment:
   ```suggestion
           synchronous: bool = True,
   ```



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,

Review Comment:
   ```suggestion
           poll_timeout: float = 60,
   ```



##########
airflow/providers/apache/kafka/shared_utils.py:
##########
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib
+from typing import Any, Callable
+
+
+def _get_callable(function_string: str) -> Callable:

Review Comment:
   There is a [native `import_string` util 
function](https://github.com/apache/airflow/blob/4f3751aab677904f043d3c0657eb8283d93a9bbd/airflow/utils/module_loading.py#L26-L41)
 that might be used instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to