amoghrajesh commented on code in PR #52015:
URL: https://github.com/apache/airflow/pull/52015#discussion_r2162953467


##########
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -82,7 +82,7 @@ def __init__(
         apply_function_batch: Callable[..., Any] | str | None = None,
         apply_function_args: Sequence[Any] | None = None,
         apply_function_kwargs: dict[Any, Any] | None = None,
-        commit_cadence: str | None = "end_of_operator",
+        commit_cadence: str = "end_of_operator",

Review Comment:
   Hm i wonder why there was a None before



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -184,3 +182,27 @@ def execute(self, context) -> Any:
         consumer.close()
 
         return
+
+    def _validate_commit_cadence(self):
+        """Validate the commit cadence configuration."""
+        if self.commit_cadence and self.commit_cadence not in 
VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+        kafka_config = 
self.hook.get_connection(self.kafka_config_id).extra_dejson
+        # Same as kafka's behavior, default to "true" if not set
+        enable_auto_commit = kafka_config.get("enable.auto.commit", 
"true").lower()
+
+        if self.commit_cadence and enable_auto_commit != "false":
+            self.log.warning(

Review Comment:
   Sounds good!
   



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -177,10 +176,33 @@ def execute(self, context) -> Any:
                 self.log.info("committing offset at %s", self.commit_cadence)
                 consumer.commit()
 
-        if self.commit_cadence:
+        if self.commit_cadence != "never":
             self.log.info("committing offset at %s", self.commit_cadence)
             consumer.commit()
 
         consumer.close()
 
         return
+
+    def _validate_commit_cadence_on_construct(self):
+        """Validate the commit_cadence parameter when the operator is 
constructed."""
+        if self.commit_cadence and self.commit_cadence not in 
VALID_COMMIT_CADENCE:
+            raise AirflowException(
+                f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got 
{self.commit_cadence}"
+            )
+
+    def _validate_commit_cadence_before_execute(self):
+        """Validate the commit_cadence parameter before executing the 
operator."""
+        kafka_config = 
self.hook.get_connection(self.kafka_config_id).extra_dejson
+        # Same as kafka's behavior, default to "true" if not set
+        enable_auto_commit = str(kafka_config.get("enable.auto.commit", 
"true")).lower()
+
+        if self.commit_cadence and enable_auto_commit != "false":
+            self.log.warning(
+                "To respect commit_cadence='%s', "
+                "'enable.auto.commit' should be set to 'false' in the Kafka 
connection configuration. "
+                "Currently, 'enable.auto.commit' is not explicitly set, so it 
defaults to 'true', which causes "
+                "the consumer to auto-commit offsets every 5 seconds. "
+                "See: 
https://kafka.apache.org/documentation/#consumerconfigs_enable.auto.commit";,

Review Comment:
   ```suggestion
                   "See: 
https://kafka.apache.org/documentation/#consumerconfigs_enable.auto.commit for 
more information",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to