[FLINK-7739][kafka-tests] Throttle down data producing thread

This closes #4751.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/152f6c9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/152f6c9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/152f6c9a

Branch: refs/heads/master
Commit: 152f6c9aff44c62744d2294b220664efc14acec9
Parents: 185c807
Author: Piotr Nowojski <[email protected]>
Authored: Thu Sep 28 16:59:09 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 31 00:05:00 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/KafkaProducerTestBase.java   | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/152f6c9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e1ba074..f81fcf1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -139,6 +139,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                                        while (running) {
                                                ctx.collect(new Tuple2<Long, 
String>(cnt, "kafka-" + cnt));
                                                cnt++;
+                                               if (cnt % 100 == 0) {
+                                                       Thread.sleep(1);
+                                               }
                                        }
                                }
 

Reply via email to