ahipp13 opened a new issue, #32926: URL: https://github.com/apache/airflow/issues/32926
### Apache Airflow version 2.6.3 ### What happened Ran the apache-airflow-providers-apache-kafka version 1.1.2 and got this error:  ### What you think should happen instead I was not at the end of my topic and I did not set a max messages so this should have processed another 1000 messages ### How to reproduce Run the provider on a topic that has more than 1000 messages ### Operating System PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" ### Versions of Apache Airflow Providers apache-airflow-providers-apache-kafka version 1.1.2 ### Deployment Official Apache Airflow Helm Chart ### Deployment details Deploying onto Kubernetes ### Anything else This problem occurs every time I run it. Looks like the problem is in "consume.py" as I am using the ConsumeFromTopicOperator. Debugging a little, it is looking like the first 1000 messages it runs fine, but then the second time through for some reason it is setting batch_size wrong:  I think this problems stems from 2 lines. This line: messages_left -= len(msgs) sets the message to -999, but then this line: batch_size = self.max_batch_size if messages_left > self.max_batch_size else messages_left I think may have the logic wrong. I think it should be a less than sign instead of an equal sign ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
