leonardBang commented on code in PR #27134:
URL: https://github.com/apache/flink/pull/27134#discussion_r2476731768
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java:
##########
@@ -435,4 +500,73 @@ private boolean isEndOfStreamReached(T record) {
return isStreamEnd;
}
}
+
+ /**
+ * A wrapper around {@link SourceOutput} that counts the number of records
during the current
+ * rate-limiting window.
+ *
+ * <p>This wrapper is used when rate limiting is enabled to track how many
records have been
+ * emitted since the last rate limit check, allowing the reader to
properly apply backpressure
+ * when the rate limit is exceeded.
+ *
+ * @param <T> The type of records being emitted
+ */
+ private static final class RateLimitingSourceOutputWrapper<T> implements
SourceOutput<T> {
+ /** The underlying source output to delegate to. */
+ final SourceOutput<T> sourceOutput;
+
+ /** Count of records handled during the current rate-limiting window.
*/
+ int currentWindowRecordCount;
Review Comment:
Could be private ?
```suggestion
private int currentWindowRecordCount;
```
##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java:
##########
@@ -27,14 +27,25 @@
/** The interface to rate limit execution of methods. */
@NotThreadSafe
@Experimental
-public interface RateLimiter {
+public interface RateLimiter<S> {
Review Comment:
I saw the definition `private final RateLimiter<SplitT> rateLimiter;` in
SourceReaderBase, what's meaning of generic type `<S>`? and this is a breaking
change from `RateLimiter` to `RateLimiter<S>`.
##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java:
##########
@@ -27,14 +27,14 @@
* A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to
a source sub-task.
*/
@Experimental
-public interface RateLimiterStrategy extends Serializable {
+public interface RateLimiterStrategy<S> extends Serializable {
Review Comment:
Please add java doc for `<S>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]