glaksh100 commented on a change in pull request #7679: [FLINK-11501][Kafka
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r261769488
##########
File path:
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
##########
@@ -139,4 +159,118 @@ public void testCommitOffsetsToKafka() throws Exception {
public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
runAutoOffsetRetrievalAndCommitToKafka();
}
+
+ /**
+ * Kafka09 specific RateLimiter test.
+ */
+ @Test(timeout = 60000)
+ public void testRateLimitedConsumer() throws Exception {
+ final String testTopic = "test-topic";
+ createTestTopic(testTopic, 3, 1);
+
+ // ---------- Produce a stream into Kafka -------------------
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ DataStream<String> stream = env.addSource(new
SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws
Exception {
+ long i = 0;
+ while (running) {
+ byte[] data = new byte[] {1};
+ ctx.collect(new String(data)); // 1 byte
+ if (i++ == 100L) {
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream.addSink(new FlinkKafkaProducer09<>(testTopic, new
SimpleStringSchema(), standardProps)).setParallelism(1);
+ env.execute("Produce 100 bytes of data to test topic");
+
+ // ---------- Consumer from Kafka in a ratelimited way
-----------
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ FlinkKafkaConsumer09<String> consumer09 = new
FlinkKafkaConsumer09<>(testTopic,
+ new StringDeserializer(), standardProps);
+
+ // ---------- RateLimiter config -------------
+ FlinkConnectorRateLimiter rateLimiter = new
GuavaFlinkConnectorRateLimiter();
+ long globalRate = 3; // bytes/second
+ rateLimiter.setRate(globalRate);
+ consumer09.setRateLimiter(rateLimiter);
+
+ DataStream<String> stream1 = env.addSource(consumer09);
+ stream1.addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value, Context context)
throws Exception {
+
+ }
+
+ });
+ long startTime = System.currentTimeMillis();
+ env.execute("Consume 100 bytes of data from test topic");
+ long endTime = System.currentTimeMillis();
+
+ // ------- Assertions --------------
+ Assert.assertNotNull(consumer09.getRateLimiter());
+ Assert.assertEquals(consumer09.getRateLimiter().getRate(),
globalRate);
+
+ // Approximate bytes/second read based on job execution time.
+ long bytesPerSecond = 100 * 1000L / (endTime - startTime);
+ Assert.assertTrue(bytesPerSecond > 0);
+ Assert.assertTrue(bytesPerSecond <= globalRate);
Review comment:
Yes. That is correct. The rate is close to 45 bytes/second when run without
setting the ratelimiter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services