josh-fell commented on code in PR #30175: URL: https://github.com/apache/airflow/pull/30175#discussion_r1140876165
########## airflow/providers/apache/kafka/hooks/consumer.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from confluent_kafka import Consumer + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaConsumerHook(BaseHook): + """KafkaConsumerHook + + A hook for creating a Kafka Consumer + :param config: A config dictionary to use with confluent_kafka library, defaults to None + """ + + + def __init__( + self, + topics: Sequence[str], + config: dict[Any, Any] | None = None, + ) -> None: + super().__init__() + + self.config: dict[Any, Any] = config or {} + self.topics = topics + + if not self.config.get("group.id", None): + raise AirflowException( + "The 'group.id' parameter must be set in the config dictionary'. Got <None>" + ) + + if not self.config.get("bootstrap.servers", None): + raise AirflowException("config['bootsrap.servers'] must be provided.") Review Comment: Should `group.id` and `bootstrap.servers` be their own explicit params for this hook? ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", Review Comment: ```suggestion commit_cadence: str = "end_of_operator", ``` ########## airflow/providers/apache/kafka/hooks/consumer.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from confluent_kafka import Consumer + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaConsumerHook(BaseHook): + """KafkaConsumerHook + + A hook for creating a Kafka Consumer + :param config: A config dictionary to use with confluent_kafka library, defaults to None Review Comment: ```suggestion A hook for creating a Kafka Consumer :param config: A config dictionary to use with confluent_kafka library, defaults to None ``` The blank line is needed between the description and parameter directives so the parameters are properly rendered in the Python API docs. This comment applies to other instances in the provider as well. ########## airflow/providers/apache/kafka/hooks/admin_client.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from confluent_kafka.admin import AdminClient, NewTopic + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaAdminClientHook(BaseHook): + """KafkaAdminClientHook + + A hook for interacting with the Kafka Cluster Review Comment: ```suggestion """ A hook for interacting with the Kafka Cluster ``` Having the hook name at the beginning of the docstring seems redundant if reading in the Python API docs. Same comment applies to other operators/hooks/triggers/methods/functions in this provider too. ########## airflow/providers/apache/kafka/hooks/admin_client.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from confluent_kafka.admin import AdminClient, NewTopic + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaAdminClientHook(BaseHook): + """KafkaAdminClientHook + + A hook for interacting with the Kafka Cluster + + :param config: A config dictionary to use with confluent_kafka library, defaults to None + """ + + def __init__( + self, + config: dict[Any, Any] | None = None, + ) -> None: + super().__init__() + + self.config: dict[Any, Any] = config or {} + + if not (self.config.get("bootstrap.servers", None)): + raise AirflowException("config['bootsrap.servers'] must be provided.") Review Comment: WDYT about breaking out `bootstrap_servers` into its own parameter since it's required configuration? Then the `config` param can include _optional_ configuration for the underlying lib. ########## airflow/providers/apache/kafka/operators/await_message.py: ########## @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class AwaitKafkaMessageOperator(BaseOperator): + """AwaitKafkaMessageOperator An Airflow operator that defers until a specific message is published to Kafka. + + The operator creates a consumer that reads the Kafka log until it encounters a positive event. + + The behavior of the consumer for this trigger is as follows: + - poll the Kafka topics for a message + - if no message returned, sleep + - process the message with provided callable and commit the message offset + - if callable returns any data, raise a TriggerEvent with the return data + - else continue to next message + - return event (as default xcom or specific xcom key) + + :param topics: Topics (or topic regex) to use for reading from + :param apply_function: The functoin to apply to messages to determine if an event occurred. As a dot + notation string. Review Comment: ```suggestion :param apply_function: The functoin to apply to messages to determine if an event occurred. As a dot notation string. ``` I forget how picky Sphinx is, but IIRC if the param description goes beyond the first line, each subsequent line needs to be indented. You can [build the docs locally with Breeze](https://github.com/apache/airflow/blob/main/BREEZE.rst#building-the-documentation) and confirm. ########## airflow/providers/apache/kafka/hooks/producer.py: ########## @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + +from confluent_kafka import Producer + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaProducerHook(BaseHook): + """KafkaProducerHook + + A hook for creating a Kafka Producer + :param config: A config dictionary to use with confluent_kafka library, defaults to None + """ + + def __init__( + self, + config: dict[Any, Any] | None = None, + no_broker: bool = False, + ) -> None: + super().__init__() + + self.config: dict[Any, Any] = config or {} + + if not self.config.get("bootstrap.servers", None): Review Comment: Same comment here re: explicit parameter for a required configuration. ########## airflow/providers/apache/kafka/hooks/consumer.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, Sequence + +from confluent_kafka import Consumer + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class KafkaConsumerHook(BaseHook): + """KafkaConsumerHook + + A hook for creating a Kafka Consumer + :param config: A config dictionary to use with confluent_kafka library, defaults to None + """ + + + def __init__( + self, + topics: Sequence[str], + config: dict[Any, Any] | None = None, + ) -> None: + super().__init__() + + self.config: dict[Any, Any] = config or {} + self.topics = topics + + if not self.config.get("group.id", None): + raise AirflowException( + "The 'group.id' parameter must be set in the config dictionary'. Got <None>" + ) + + if not self.config.get("bootstrap.servers", None): + raise AirflowException("config['bootsrap.servers'] must be provided.") + + def get_consumer(self) -> Consumer: Review Comment: WDYT about splitting this method into two: 1. `get_consumer()` which retrieves the `Consumer` object 2. `subscribe_to_topics()` which uses the `Consumer` object and subscribes to an input set of topics? The subscribe functionality could be used atomically if folks wanted to use it and also use the `Consumer` object via `get_consumer()` for other actions which might not have an explicit hook method in this provider. ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", + max_messages: int | None = None, + max_batch_size: int = 1000, + poll_timeout: float | None = 60, + **kwargs: Any, + ) -> None: + + super().__init__(**kwargs) + + self.topics = topics + self.apply_function = apply_function or None + self.apply_function_batch = apply_function_batch or None + self.apply_function_args = apply_function_args or () + self.apply_function_kwargs = apply_function_kwargs or {} + self.config = consumer_config or {} + self.commit_cadence = commit_cadence + self.max_messages = max_messages or True + self.max_batch_size = max_batch_size + self.poll_timeout = poll_timeout + + if self.commit_cadence not in VALID_COMMIT_CADENCE: + raise AirflowException( Review Comment: WDYT about raising a `ValueError` instead? ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", + max_messages: int | None = None, + max_batch_size: int = 1000, + poll_timeout: float | None = 60, + **kwargs: Any, + ) -> None: + + super().__init__(**kwargs) + + self.topics = topics + self.apply_function = apply_function or None + self.apply_function_batch = apply_function_batch or None + self.apply_function_args = apply_function_args or () + self.apply_function_kwargs = apply_function_kwargs or {} + self.config = consumer_config or {} + self.commit_cadence = commit_cadence + self.max_messages = max_messages or True Review Comment: Why does `max_messages` default to `True` if int types are the expected arguments? Maybe there should be an explicit parameter which is the equivalent of `max_messages==True` like `fetch_all_messages`? Gives users the option to either specify a maximum number of messages to consume or consume all possible messages from the topic. ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", + max_messages: int | None = None, + max_batch_size: int = 1000, + poll_timeout: float | None = 60, + **kwargs: Any, + ) -> None: + + super().__init__(**kwargs) + + self.topics = topics + self.apply_function = apply_function or None + self.apply_function_batch = apply_function_batch or None Review Comment: ```suggestion self.apply_function = apply_function self.apply_function_batch = apply_function_batch ``` Since `None` is the default. ########## airflow/providers/apache/kafka/provider.yaml: ########## Review Comment: Should use Apache Kafka rather than Kafka throughout this file. ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", + max_messages: int | None = None, + max_batch_size: int = 1000, + poll_timeout: float | None = 60, + **kwargs: Any, + ) -> None: + + super().__init__(**kwargs) + + self.topics = topics + self.apply_function = apply_function or None + self.apply_function_batch = apply_function_batch or None + self.apply_function_args = apply_function_args or () + self.apply_function_kwargs = apply_function_kwargs or {} + self.config = consumer_config or {} + self.commit_cadence = commit_cadence + self.max_messages = max_messages or True + self.max_batch_size = max_batch_size + self.poll_timeout = poll_timeout + + if self.commit_cadence not in VALID_COMMIT_CADENCE: + raise AirflowException( + f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got {self.commit_cadence}" + ) + + if self.max_messages and self.max_batch_size > self.max_messages: + self.log.warning( + f"max_batch_size ({self.max_batch_size}) > max_messages" + f" ({self.max_messages}). Setting max_messages to" + f" {self.max_batch_size}" + ) Review Comment: Let's avoid using f-string interpolation in logging statements and use %-formatting instead. ########## airflow/providers/apache/kafka/provider.yaml: ########## @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-kafka +name: Kafka + +description: | + `Kafka <https://kafka.apache.org/>`__ +versions: + - 0.3.0 + +dependencies: + - apache-airflow>=2.2.0 Review Comment: All providers now have a minimum Airflow version of 2.3.0. ```suggestion - apache-airflow>=2.3.0 ``` ########## tests/providers/apache/kafka/hooks/test_admin_client.py: ########## @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import unittest + +import pytest + +from airflow import AirflowException + +# Import Hook +from airflow.providers.apache.kafka.hooks.admin_client import KafkaAdminClientHook + +log = logging.getLogger(__name__) + + +class TestSampleHook(unittest.TestCase): Review Comment: Net-new tests should use `pytest` instead. ########## airflow/providers/apache/kafka/operators/produce_to_topic.py: ########## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.producer import KafkaProducerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +local_logger = logging.getLogger("airflow") + + +def acked(err, msg): + if err is not None: + local_logger.error(f"Failed to deliver message: {err}") + else: + local_logger.info( + f"Produced record to topic {msg.topic()} partition [{msg.partition()}] @ offset {msg.offset()}" + ) + + +class ProduceToTopicOperator(BaseOperator): + """ProduceToTopicOperator An operator that produces messages to a Kafka topic + + Registers a producer to a kafka topic and publishes messages to the log. + + + :param topic: The topic the producer should produce to, defaults to None + :param producer_function: The function that generates key/value pairs as messages for production, defaults to None + :param producer_function_args: Additional arguments to be applied to the producer callable, defaults to None + :param producer_function_kwargs: Additional keyword arguments to be applied to the producer callable, + defaults to None + :param delivery_callback: The callback to apply after delivery(or failure) of a message, defaults to None + :param synchronous: If writes to kafka should be fully synchronous, defaults to True + :param kafka_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None + :param poll_timeout: How long of a delay should be applied when calling poll after production to kafka, + defaults to 0 + :raises AirflowException: _description_ + """ + + template_fields = ( + "topic", + "producer_function", + "producer_function_args", + "producer_function_kwargs", + "kafka_config", + ) + + def __init__( + self, + topic: str, + producer_function: str | Callable[..., Any], + producer_function_args: Sequence[Any] | None = None, + producer_function_kwargs: dict[Any, Any] | None = None, + delivery_callback: str | None = None, + synchronous: bool | None = True, Review Comment: ```suggestion synchronous: bool = True, ``` ########## airflow/providers/apache/kafka/operators/consume_from_topic.py: ########## @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import partial +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook +from airflow.providers.apache.kafka.shared_utils import _get_callable + +VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} + + +class ConsumeFromTopicOperator(BaseOperator): + """ConsumeFromTopicOperator An operator that consumes from Kafka a topic(s) and processing the messages. + + The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them + using the user supplied callable function. The consumer will continue to read in batches until it reaches + the end of the log or reads a maximum number of messages is reached. + + :param topics: A list of topics or regex patterns the consumer should subsrcribe to. + :param apply_function: The function that should be applied to fetched one at a time. + name of dag file executing the function and the function name delimited by a `.` + :param apply_function_batch: The function that should be applied to a batch of messages fetched. Can not + be used with `apply_function`. Intended for transactional workloads where an expensive task might + be called before or after operations on the messages are taken. + :param apply_function_args: Additional arguments that should be applied to the callable, defaults to None + :param apply_function_kwargs: Additional key word arguments that should be applied to the callable + , defaults to None + :param consumer_config: the config dictionary for the kafka client (additional information available on the + confluent-python-kafka documentation), defaults to None, defaults to None + :param commit_cadence: When consumers should commit offsets ("never", "end_of_batch","end_of_operator"), defaults to "end_of_operator"; + if end_of_operator, the commit() is called based on the max_messsages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. + if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. + if never, close() is called without calling the commit() method. + :param max_messages: The maximum total number of messages an operator should read from Kafka, + defaults to None + :param max_batch_size: The maximum number of messages a consumer should read when polling, defaults to 1000 + :param poll_timeout: How long the Kafka consumer should wait before determining no more messages are available, + defaults to 60 + """ + + BLUE = "#ffefeb" + ui_color = BLUE + template_fields = ( + "topics", + "apply_function", + "apply_function_args", + "apply_function_kwargs", + "config", + ) + + def __init__( + self, + topics: str | Sequence[str], + apply_function: Callable[..., Any] | str | None = None, + apply_function_batch: Callable[..., Any] | str | None = None, + apply_function_args: Sequence[Any] | None = None, + apply_function_kwargs: dict[Any, Any] | None = None, + consumer_config: dict[Any, Any] | None = None, + commit_cadence: str | None = "end_of_operator", + max_messages: int | None = None, + max_batch_size: int = 1000, + poll_timeout: float | None = 60, Review Comment: ```suggestion poll_timeout: float = 60, ``` ########## airflow/providers/apache/kafka/shared_utils.py: ########## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import importlib +from typing import Any, Callable + + +def _get_callable(function_string: str) -> Callable: Review Comment: There is a [native `import_string` util function](https://github.com/apache/airflow/blob/4f3751aab677904f043d3c0657eb8283d93a9bbd/airflow/utils/module_loading.py#L26-L41) that might be used instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
