ahipp13 opened a new issue, #32926:
URL: https://github.com/apache/airflow/issues/32926

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   Ran the apache-airflow-providers-apache-kafka version 1.1.2 and got this 
error:
   
   
![image](https://github.com/apache/airflow/assets/118911990/bbb24811-8770-4210-b701-b4aa696c0fdf)
   
   
   ### What you think should happen instead
   
   I was not at the end of my topic and I did not set a max messages so this 
should have processed another 1000 messages
   
   ### How to reproduce
   
   Run the provider on a topic that has more than 1000 messages
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-kafka version 1.1.2
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Deploying onto Kubernetes
   
   ### Anything else
   
   This problem occurs every time I run it. Looks like the problem is in 
"consume.py" as I am using the ConsumeFromTopicOperator. Debugging a little, it 
is looking like the first 1000 messages it runs fine, but then the second time 
through for some reason it is setting batch_size wrong:
   
   
![image](https://github.com/apache/airflow/assets/118911990/1c571a44-d1be-4eeb-b87b-dc4db66d120b)
   
   I think this problems stems from 2 lines. This line: messages_left -= 
len(msgs) sets the message to -999, but then this line: batch_size = 
self.max_batch_size if messages_left > self.max_batch_size else messages_left I 
think may have the logic wrong. I think it should be a less than sign instead 
of an equal sign
   
   
   
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to