This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f80ab20 [FLINK-11293][kafka, test] Added timeout to delete topic
request.
f80ab20 is described below
commit f80ab20cbc5cdc80d1f0b477fd8c9e0cf9e273fb
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jan 17 15:32:40 2019 +0100
[FLINK-11293][kafka, test] Added timeout to delete topic request.
Logs from failures show that even though the request never finishes,
kafka topics are actually deleted. Therefore this PR adds timeout on
delete request. If the request times out we check if the topic actually
exists. If not then we continue as if the delete request succeeded.
---
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 20 ++++++++++++++++++--
1 file changed, 18 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 12a0c04..2f16904 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -60,6 +60,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import scala.collection.mutable.ArraySeq;
@@ -85,6 +87,7 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000;
private Config config;
+ private static final int DELETE_TIMEOUT_SECONDS = 30;
public void setProducerSemantic(FlinkKafkaProducer.Semantic
producerSemantic) {
this.producerSemantic = producerSemantic;
@@ -155,10 +158,23 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
public void deleteTestTopic(String topic) {
LOG.info("Deleting topic {}", topic);
try (AdminClient adminClient =
AdminClient.create(getStandardProperties())) {
-
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
+ tryDelete(adminClient, topic);
} catch (Exception e) {
e.printStackTrace();
- fail("Delete test topic : " + topic + " failed, " +
e.getMessage());
+ fail(String.format("Delete test topic : %s failed, %s",
topic, e.getMessage()));
+ }
+ }
+
+ private void tryDelete(AdminClient adminClient, String topic)
+ throws Exception {
+ try {
+
adminClient.deleteTopics(Collections.singleton(topic)).all().get(DELETE_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ LOG.info("Did not receive delete topic response within
%d seconds. Checking if it succeeded",
+ DELETE_TIMEOUT_SECONDS);
+ if
(adminClient.listTopics().names().get(DELETE_TIMEOUT_SECONDS,
TimeUnit.SECONDS).contains(topic)) {
+ throw new Exception("Topic still exists after
timeout");
+ }
}
}