josh-fell commented on code in PR #30175:
URL: https://github.com/apache/airflow/pull/30175#discussion_r1157200175


##########
airflow/providers/apache/kafka/example_dags/__init__.py:
##########


Review Comment:
   The concept of Example DAGs has transitioned to System Tests via 
[AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests).
 Since these DAGs exist in `system/providers/apache/kafka/` already, the 
`example_dags` directory can be removed.



##########
airflow/providers/apache/kafka/hooks/produce.py:
##########
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from confluent_kafka import Producer
+
+from airflow.providers.apache.kafka.hooks.base import KafkaHook
+
+
+class KafkaProducerHook(KafkaHook):
+    """
+    A hook for creating a Kafka Producer
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
+    """
+
+    def __init__(self, kafka_config_id=KafkaHook.default_conn_name) -> None:
+        super().__init__(kafka_config_id=kafka_config_id)
+
+        if not self.get_conn().get("bootstrap.servers", None):

Review Comment:
   Similarly, this validation is done within `get_conn()` itself. This doesn't 
look to be needed.



##########
airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -0,0 +1,184 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
+from airflow.utils.module_loading import import_string
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """An operator that consumes from Kafka a topic(s) and processing the 
messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
+    :param topics: A list of topics or regex patterns the consumer should 
subscribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        defaults to None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"),
+        defaults to "end_of_operator";
+        if end_of_operator, the commit() is called based on the max_messages 
arg. Commits are made after the
+        operator has processed the apply_function method for the maximum 
messages in the operator.
+        if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each
+        batch has processed by the apply_function method for all messages in 
the batch.
+        if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+        defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling,
+        defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are
+        available, defaults to 60
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:ConsumeFromTopicOperator`
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "kafka_config_id",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        kafka_config_id: str = "kafka_default",
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function
+        self.apply_function_batch = apply_function_batch
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.kafka_config_id = kafka_config_id
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+        if self.max_messages and self.max_batch_size > self.max_messages:
+            self.log.warning(
+                "max_batch_size (%s) > max_messages" " (%s). Setting 
max_messages to" " %s ",

Review Comment:
   ```suggestion
                   "max_batch_size (%s) > max_messages (%s). Setting 
max_messages to %s ",
   ```
   Probably some funny business from `black` re-formatting.



##########
airflow/providers/apache/kafka/hooks/produce.py:
##########
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from confluent_kafka import Producer
+
+from airflow.providers.apache.kafka.hooks.base import KafkaHook
+
+
+class KafkaProducerHook(KafkaHook):
+    """
+    A hook for creating a Kafka Producer
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"

Review Comment:
   ```suggestion
       A hook for creating a Kafka Producer
   
       :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
   ```
   Otherwise the parameters won't render correctly in the Python API docs.



##########
airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -0,0 +1,184 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
+from airflow.utils.module_loading import import_string
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """An operator that consumes from Kafka a topic(s) and processing the 
messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
+    :param topics: A list of topics or regex patterns the consumer should 
subscribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        defaults to None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"),
+        defaults to "end_of_operator";
+        if end_of_operator, the commit() is called based on the max_messages 
arg. Commits are made after the
+        operator has processed the apply_function method for the maximum 
messages in the operator.
+        if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each
+        batch has processed by the apply_function method for all messages in 
the batch.
+        if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+        defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling,
+        defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are
+        available, defaults to 60
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:ConsumeFromTopicOperator`
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "kafka_config_id",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        kafka_config_id: str = "kafka_default",
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function
+        self.apply_function_batch = apply_function_batch
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.kafka_config_id = kafka_config_id
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+        if self.max_messages and self.max_batch_size > self.max_messages:
+            self.log.warning(
+                "max_batch_size (%s) > max_messages" " (%s). Setting 
max_messages to" " %s ",
+                self.max_batch_size,
+                self.max_messages,
+                self.max_batch_size,
+            )
+
+        if self.commit_cadence == "never":
+            self.commit_cadence = None
+
+        if apply_function and apply_function_batch:
+            raise AirflowException(
+                "One of apply_function or apply_function_batch must be 
supplied, not both."
+            )
+
+    def execute(self, context) -> Any:
+
+        consumer = KafkaConsumerHook(topics=self.topics, 
kafka_config_id=self.kafka_config_id).get_consumer()
+
+        if isinstance(self.apply_function, str):
+            self.apply_function = import_string(self.apply_function)
+
+        if isinstance(self.apply_function_batch, str):
+            self.apply_function_batch = 
import_string(self.apply_function_batch)
+
+        if self.apply_function:
+            apply_callable = partial(
+                self.apply_function, *self.apply_function_args, 
**self.apply_function_kwargs  # type: ignore
+            )
+
+        if self.apply_function_batch:
+            apply_callable = partial(
+                self.apply_function_batch,  # type: ignore
+                *self.apply_function_args,
+                **self.apply_function_kwargs,
+            )
+
+        messages_left = self.max_messages
+        messages_processed = 0
+
+        while (
+            messages_left > 0
+        ):  # bool(True > 0) == True in the case where self.max_messages isn't 
set by the user
+
+            if not isinstance(messages_left, bool):
+                batch_size = self.max_batch_size if messages_left > 
self.max_batch_size else messages_left
+            else:
+                batch_size = self.max_batch_size
+
+            msgs = consumer.consume(num_messages=batch_size, 
timeout=self.poll_timeout)
+            messages_left -= len(msgs)
+            messages_processed += len(msgs)
+
+            if not msgs:  # No messages + messages_left is being used.
+                self.log.info("Reached end of log. Exiting.")
+                break
+
+            if self.apply_function:
+                for m in msgs:
+                    apply_callable(m)
+
+            if self.apply_function_batch:
+                apply_callable(msgs)
+
+            if self.commit_cadence == "end_of_batch":
+                self.log.info("committing offset at %s", self.commit_cadence)
+                consumer.commit()
+
+        if self.commit_cadence:
+            self.log.info("committing offset at %s", self.commit_cadence)
+            consumer.commit()
+
+        consumer.close()
+
+        return

Review Comment:
   ```suggestion
   ```
   Not necessary and the return type annotation for `execute()` can be updated 
as well.



##########
airflow/providers/apache/kafka/operators/produce.py:
##########
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
+from airflow.utils.module_loading import import_string
+
+local_logger = logging.getLogger("airflow")
+
+
+def acked(err, msg):
+    if err is not None:
+        local_logger.error(f"Failed to deliver message: {err}")
+    else:
+        local_logger.info(
+            f"Produced record to topic {msg.topic()} partition 
[{msg.partition()}] @ offset {msg.offset()}"
+        )
+
+
+class ProduceToTopicOperator(BaseOperator):
+    """An operator that produces messages to a Kafka topic
+
+    Registers a producer to a kafka topic and publishes messages to the log.
+
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
+    :param topic: The topic the producer should produce to, defaults to None
+    :param producer_function: The function that generates key/value pairs as 
messages for production,
+        defaults to None
+    :param producer_function_args: Additional arguments to be applied to the 
producer callable,
+        defaults to None
+    :param producer_function_kwargs: Additional keyword arguments to be 
applied to the producer callable,
+        defaults to None
+    :param delivery_callback: The callback to apply after delivery(or failure) 
of a message, defaults to None
+    :param synchronous: If writes to kafka should be fully synchronous, 
defaults to True
+    :param poll_timeout: How long of a delay should be applied when calling 
poll after production to kafka,
+        defaults to 0
+    :raises AirflowException: _description_
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:ProduceToTopicOperator`
+    """
+
+    template_fields = (
+        "topic",
+        "producer_function",
+        "producer_function_args",
+        "producer_function_kwargs",
+        "kafka_config_id",
+    )
+
+    def __init__(
+        self,
+        topic: str,
+        producer_function: str | Callable[..., Any],
+        kafka_config_id: str = "kafka_default",
+        producer_function_args: Sequence[Any] | None = None,
+        producer_function_kwargs: dict[Any, Any] | None = None,
+        delivery_callback: str | None = None,
+        synchronous: bool = True,
+        poll_timeout: float = 0,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        if delivery_callback:
+            dc = import_string(delivery_callback)
+        else:
+            dc = acked
+
+        self.kafka_config_id = kafka_config_id
+        self.topic = topic
+        self.producer_function = producer_function
+        self.producer_function_args = producer_function_args or ()
+        self.producer_function_kwargs = producer_function_kwargs or {}
+        self.delivery_callback = dc
+        self.synchronous = synchronous
+        self.poll_timeout = poll_timeout
+
+        if not (self.topic and self.producer_function):
+            raise AirflowException(
+                "topic and producer_function must be provided. Got topic="
+                + f"{self.topic} and 
producer_function={self.producer_function}"

Review Comment:
   ```suggestion
                   f"{self.topic} and 
producer_function={self.producer_function}"
   ```
   The `+` operator isn't needed. The string will still concatenate as expected 
because of [string literal 
concatentation](https://docs.python.org/2/reference/lexical_analysis.html#string-literal-concatenation).



##########
tests/providers/apache/kafka/hooks/test_admin_client.py:
##########
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+
+import pytest
+
+from airflow.models import Connection
+from airflow.providers.apache.kafka.hooks.base import KafkaHook
+from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
+from airflow.utils import db
+
+log = logging.getLogger(__name__)
+
+
+class TestSampleHook:
+    """
+    Test Admin Client Hook.
+    """
+
+    def setup_method(self):
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_default",
+                conn_type="kafka",
+                extra=json.dumps(
+                    {"socket.timeout.ms": 10, "bootstrap.servers": 
"localhost:9092", "group.id": "test_group"}
+                ),
+            )
+        )
+
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_bad",
+                conn_type="kafka",
+                extra=json.dumps({"socket.timeout.ms": 10}),
+            )
+        )
+
+    def test_init(self):
+        """test initialization of AdminClientHook"""
+
+        # Standard Init
+        KafkaAdminClientHook(kafka_config_id="kafka_default")
+
+        # # Not Enough Args
+        with pytest.raises(ValueError):
+            KafkaAdminClientHook(kafka_config_id="kafka_bad")
+
+    def test_create_topic(self, mocker):

Review Comment:
   What is the assertion being tested here?



##########
tests/providers/apache/kafka/hooks/test_consumer.py:
##########
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+
+import pytest
+
+from airflow.models import Connection
+
+# Import Hook
+from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
+from airflow.utils import db
+
+
+class TestConsumerHook:
+    """
+    Test consumer hook.
+    """
+
+    def setup_method(self):
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_default",
+                conn_type="kafka",
+                extra=json.dumps(
+                    {"socket.timeout.ms": 10, "bootstrap.servers": 
"localhost:9092", "group.id": "test_group"}
+                ),
+            )
+        )
+
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_bad",
+                conn_type="kafka",
+                extra=json.dumps({}),
+            )
+        )
+
+    def test_init(self):
+        """test initialization of AdminClientHook"""
+
+        # Standard Init
+        KafkaConsumerHook(["test_1"], kafka_config_id="kafka_default")

Review Comment:
   Should probably at least assert instance attrs of the hook are set as 
expected.



##########
airflow/providers/apache/kafka/operators/produce.py:
##########
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
+from airflow.utils.module_loading import import_string
+
+local_logger = logging.getLogger("airflow")
+
+
+def acked(err, msg):
+    if err is not None:
+        local_logger.error(f"Failed to deliver message: {err}")
+    else:
+        local_logger.info(
+            f"Produced record to topic {msg.topic()} partition 
[{msg.partition()}] @ offset {msg.offset()}"
+        )
+
+
+class ProduceToTopicOperator(BaseOperator):
+    """An operator that produces messages to a Kafka topic
+
+    Registers a producer to a kafka topic and publishes messages to the log.
+
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_default"
+    :param topic: The topic the producer should produce to, defaults to None
+    :param producer_function: The function that generates key/value pairs as 
messages for production,
+        defaults to None
+    :param producer_function_args: Additional arguments to be applied to the 
producer callable,
+        defaults to None
+    :param producer_function_kwargs: Additional keyword arguments to be 
applied to the producer callable,
+        defaults to None
+    :param delivery_callback: The callback to apply after delivery(or failure) 
of a message, defaults to None
+    :param synchronous: If writes to kafka should be fully synchronous, 
defaults to True
+    :param poll_timeout: How long of a delay should be applied when calling 
poll after production to kafka,
+        defaults to 0
+    :raises AirflowException: _description_
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:ProduceToTopicOperator`
+    """
+
+    template_fields = (
+        "topic",
+        "producer_function",
+        "producer_function_args",
+        "producer_function_kwargs",
+        "kafka_config_id",
+    )
+
+    def __init__(
+        self,
+        topic: str,
+        producer_function: str | Callable[..., Any],
+        kafka_config_id: str = "kafka_default",
+        producer_function_args: Sequence[Any] | None = None,
+        producer_function_kwargs: dict[Any, Any] | None = None,
+        delivery_callback: str | None = None,
+        synchronous: bool = True,
+        poll_timeout: float = 0,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        if delivery_callback:
+            dc = import_string(delivery_callback)
+        else:
+            dc = acked
+
+        self.kafka_config_id = kafka_config_id
+        self.topic = topic
+        self.producer_function = producer_function
+        self.producer_function_args = producer_function_args or ()
+        self.producer_function_kwargs = producer_function_kwargs or {}
+        self.delivery_callback = dc
+        self.synchronous = synchronous
+        self.poll_timeout = poll_timeout
+
+        if not (self.topic and self.producer_function):
+            raise AirflowException(
+                "topic and producer_function must be provided. Got topic="
+                + f"{self.topic} and 
producer_function={self.producer_function}"
+            )
+
+        return
+
+    def execute(self, context) -> Any:

Review Comment:
   ```suggestion
       def execute(self, context) -> None:
   ```



##########
tests/providers/apache/kafka/operators/test_consume.py:
##########


Review Comment:
   What are the assertions in these tests either positive or negative? Same 
comment in test_produce.py as well.



##########
airflow/providers/apache/kafka/hooks/consume.py:
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Sequence
+
+from confluent_kafka import Consumer
+
+from airflow.providers.apache.kafka.hooks.base import KafkaHook
+
+
+class KafkaConsumerHook(KafkaHook):
+    """
+    A hook for creating a Kafka Consumer
+
+    :param kafka_config_id: The connection object to use, defaults to 
"kafka_config_default"
+    :param topics: A list of topics to subscribe to.
+    """
+
+    def __init__(self, topics: Sequence[str], 
kafka_config_id=KafkaHook.default_conn_name) -> None:
+
+        super().__init__(kafka_config_id=kafka_config_id)
+        self.topics = topics
+        if not self.get_conn().get("group.id", None):
+            raise ValueError("The 'group.id' parameter must be set in the 
config dictionary'. Got <None>")
+
+        if not self.get_conn().get("bootstrap.servers", None):

Review Comment:
   +1 for this suggestion in the base hook.
   
   Also, `get_conn()` already raises a `ValueError` if "bootstrap.servers" 
doesn't exist in the connection extras.



##########
tests/providers/apache/kafka/hooks/test_admin_client.py:
##########


Review Comment:
   Some coverage on `get_conn()` (or if it turns into a cached_property) in all 
of the hook tests generally (even in test_consumer.py and test_producer.py) 
would be great.



##########
tests/providers/apache/kafka/hooks/test_admin_client.py:
##########
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+
+import pytest
+
+from airflow.models import Connection
+from airflow.providers.apache.kafka.hooks.base import KafkaHook
+from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
+from airflow.utils import db
+
+log = logging.getLogger(__name__)
+
+
+class TestSampleHook:
+    """
+    Test Admin Client Hook.
+    """
+
+    def setup_method(self):
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_default",
+                conn_type="kafka",
+                extra=json.dumps(
+                    {"socket.timeout.ms": 10, "bootstrap.servers": 
"localhost:9092", "group.id": "test_group"}
+                ),
+            )
+        )
+
+        db.merge_conn(
+            Connection(
+                conn_id="kafka_bad",
+                conn_type="kafka",
+                extra=json.dumps({"socket.timeout.ms": 10}),
+            )
+        )
+
+    def test_init(self):
+        """test initialization of AdminClientHook"""
+
+        # Standard Init
+        KafkaAdminClientHook(kafka_config_id="kafka_default")
+
+        # # Not Enough Args
+        with pytest.raises(ValueError):
+            KafkaAdminClientHook(kafka_config_id="kafka_bad")
+
+    def test_create_topic(self, mocker):

Review Comment:
   WDYT about adding a test re: topics that already exist?



##########
docs/apache-airflow-providers-apache-kafka/connections/kafka.rst:
##########
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection: kafka
+
+Apache Kafka Connection
+========================
+
+The Apache Kafka connection type configures a connection to Apache Kafka via 
the ``confluent-kafka`` Python package.
+
+Default Connection IDs
+----------------------
+
+Kafka hooks and operators use ``kafka_default`` by default.
+
+Configuring the Connection
+--------------------------
+
+Connections are configured as a json serializable string within provided to 
the ``extra`` field. A full list of parameters
+are described in the `Confluent Kafka python library 
<https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
+
+If you are defining the Airflow connection from the Airflow UI, the ``extra`` 
field will be renamed to ``Config Dict``.
+
+Most operators and hooks will check that at the minimum ``bootstrap.servers`` 
key exists and has a value set to be valid.

Review Comment:
   Since checking that "bootstrap.servers" exists in done within `get_conn()`, 
this config would be required for all hook, operator, and sensor use right?
   
   If so, I think it would be wise to make Bootstrap Servers an explicit field 
in the Airflow UI too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to