dylanbstorey commented on code in PR #30175:
URL: https://github.com/apache/airflow/pull/30175#discussion_r1140900144


##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(

Review Comment:
   No strong feelings. 



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True

Review Comment:
   If you go further down you'll see the while loop. When `max_messages` == 
True, it will read to the end of the log as a default behavior. 



##########
airflow/providers/apache/kafka/provider.yaml:
##########


Review Comment:
   Missed this after Kaxil's feedback earlier. Thanks !



##########
airflow/providers/apache/kafka/shared_utils.py:
##########
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib
+from typing import Any, Callable
+
+
+def _get_callable(function_string: str) -> Callable:

Review Comment:
   there is. 
   
   The doc string indicates "attr/class", while true it is pretty much the same 
code - i'm not sure if the intent will stay consistent in the future.  



##########
airflow/providers/apache/kafka/operators/consume_from_topic.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import partial
+from typing import Any, Callable, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
+from airflow.providers.apache.kafka.shared_utils import _get_callable
+
+VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
+
+
+class ConsumeFromTopicOperator(BaseOperator):
+    """ConsumeFromTopicOperator An operator that consumes from Kafka a 
topic(s) and processing the messages.
+
+    The operator creates a Kafka consumer that reads a batch of messages from 
the cluster and processes them
+    using the user supplied callable function. The consumer will continue to 
read in batches until it reaches
+    the end of the log or reads a maximum number of messages is reached.
+
+    :param topics: A list of topics or regex patterns the consumer should 
subsrcribe to.
+    :param apply_function: The function that should be applied to fetched one 
at a time.
+        name of dag file executing the function and the function name 
delimited by a `.`
+    :param apply_function_batch: The function that should be applied to a 
batch of messages fetched. Can not
+        be used with `apply_function`. Intended for transactional workloads 
where an expensive task might
+        be called before or after operations on the messages are taken.
+    :param apply_function_args: Additional arguments that should be applied to 
the callable, defaults to None
+    :param apply_function_kwargs: Additional key word arguments that should be 
applied to the callable
+        , defaults to None
+    :param consumer_config: the config dictionary for the kafka client 
(additional information available on the
+        confluent-python-kafka documentation), defaults to None, defaults to 
None
+    :param commit_cadence: When consumers should commit offsets ("never", 
"end_of_batch","end_of_operator"), defaults to "end_of_operator";
+         if end_of_operator, the commit() is called based on the max_messsages 
arg. Commits are made after the operator has processed the apply_function 
method for the maximum messages in the operator.
+         if end_of_batch, the commit() is called based on the max_batch_size 
arg. Commits are made after each batch has processed by the apply_function 
method for all messages in the batch.
+         if never,  close() is called without calling the commit() method.
+    :param max_messages: The maximum total number of messages an operator 
should read from Kafka,
+         defaults to None
+    :param max_batch_size: The maximum number of messages a consumer should 
read when polling, defaults to 1000
+    :param poll_timeout: How long the Kafka consumer should wait before 
determining no more messages are available,
+         defaults to 60
+    """
+
+    BLUE = "#ffefeb"
+    ui_color = BLUE
+    template_fields = (
+        "topics",
+        "apply_function",
+        "apply_function_args",
+        "apply_function_kwargs",
+        "config",
+    )
+
+    def __init__(
+        self,
+        topics: str | Sequence[str],
+        apply_function: Callable[..., Any] | str | None = None,
+        apply_function_batch: Callable[..., Any] | str | None = None,
+        apply_function_args: Sequence[Any] | None = None,
+        apply_function_kwargs: dict[Any, Any] | None = None,
+        consumer_config: dict[Any, Any] | None = None,
+        commit_cadence: str | None = "end_of_operator",
+        max_messages: int | None = None,
+        max_batch_size: int = 1000,
+        poll_timeout: float | None = 60,
+        **kwargs: Any,
+    ) -> None:
+
+        super().__init__(**kwargs)
+
+        self.topics = topics
+        self.apply_function = apply_function or None
+        self.apply_function_batch = apply_function_batch or None
+        self.apply_function_args = apply_function_args or ()
+        self.apply_function_kwargs = apply_function_kwargs or {}
+        self.config = consumer_config or {}
+        self.commit_cadence = commit_cadence
+        self.max_messages = max_messages or True
+        self.max_batch_size = max_batch_size
+        self.poll_timeout = poll_timeout
+
+        if self.commit_cadence not in VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+        if self.max_messages and self.max_batch_size > self.max_messages:
+            self.log.warning(
+                f"max_batch_size ({self.max_batch_size}) > max_messages"
+                f" ({self.max_messages}). Setting max_messages to"
+                f" {self.max_batch_size}"
+            )

Review Comment:
   looks like the pre-commit hook didn't catch this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to