becketqin commented on a change in pull request #9387: [FLINK-13231] [pubsub]
Replace Max outstanding acknowledgement ids li…
URL: https://github.com/apache/flink/pull/9387#discussion_r315146496
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
##########
@@ -92,6 +93,10 @@ public void open(Configuration configuration) throws
Exception {
getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked",
this::getOutstandingMessagesToAck);
+ //convert per-subtask-limit to global rate limit, as
FlinkConnectorRateLimiter::setRate expects a global rate limit.
+ rateLimiter.setRate(messagePerSecondRateLimit *
getRuntimeContext().getNumberOfParallelSubtasks());
Review comment:
Talked to @Xeli offline. Actually the rate limit is a global rate as the
value was divided by the parallelism in the rate limiter implementation. So the
current patch does set the per task rate.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services