jason810496 commented on code in PR #52015:
URL: https://github.com/apache/airflow/pull/52015#discussion_r2160301422
##########
providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py:
##########
@@ -184,3 +182,27 @@ def execute(self, context) -> Any:
consumer.close()
return
+
+ def _validate_commit_cadence(self):
+ """Validate the commit cadence configuration."""
+ if self.commit_cadence and self.commit_cadence not in
VALID_COMMIT_CADENCE:
+ raise AirflowException(
+ f"commit_cadence must be one of {VALID_COMMIT_CADENCE}. Got
{self.commit_cadence}"
+ )
+
+ kafka_config =
self.hook.get_connection(self.kafka_config_id).extra_dejson
+ # Same as kafka's behavior, default to "true" if not set
+ enable_auto_commit = kafka_config.get("enable.auto.commit",
"true").lower()
+
+ if self.commit_cadence and enable_auto_commit != "false":
+ self.log.warning(
Review Comment:
I decide not to raise error but log warning to user for backward
compatibitlity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]