becketqin commented on a change in pull request #9387: [FLINK-13231] [pubsub] 
Replace Max outstanding acknowledgement ids li…
URL: https://github.com/apache/flink/pull/9387#discussion_r315072045
 
 

 ##########
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##########
 @@ -92,6 +93,10 @@ public void open(Configuration configuration) throws 
Exception {
 
                
getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", 
this::getOutstandingMessagesToAck);
 
+               //convert per-subtask-limit to global rate limit, as 
FlinkConnectorRateLimiter::setRate expects a global rate limit.
+               rateLimiter.setRate(messagePerSecondRateLimit * 
getRuntimeContext().getNumberOfParallelSubtasks());
 
 Review comment:
   I am not sure if this is the correct logic. I think the rate limiter 
mentioned it is a "global" rate limit meaning the configuration itself is 
applied globally. But the effect of rate limit  is still per subtask. I don't 
think there is any communication in the rate limiter among different subtasks 
to ensure it is a rate limit shared by all the subtasks.
   
   With the current code, if I have 2 subtasks. And I want to have each subtask 
to consume 100 records/second. I have to set the config to 50, this gives me 
100 messages per subtask and 200 messages/s for the entire job. That seems very 
confusing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to