[ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492675#comment-14492675 ]
ASF GitHub Bot commented on FLINK-1753: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28257271 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -524,13 +537,149 @@ public void cancel() { } } + private static boolean leaderHasShutDown = false; + + @Test + public void brokerFailureTest() throws Exception { + String topic = "brokerFailureTestTopic"; + + createTestTopic(topic, 2, 2); - private void createTestTopic(String topic, int numberOfPartitions) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); + final String leaderToShutDown = + kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + final Thread brokerShutdown = new Thread(new Runnable() { + @Override + public void run() { + shutdownKafkaBroker = false; + while (!shutdownKafkaBroker) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Interruption", e); + } + } + + for (KafkaServer kafkaServer : brokers) { + if (leaderToShutDown.equals( + kafkaServer.config().advertisedHostName() + + ":" + + kafkaServer.config().advertisedPort() + )) { + LOG.info("Killing Kafka Server {}", leaderToShutDown); + kafkaServer.shutdown(); + leaderHasShutDown = true; + break; + } + } + } + }); + brokerShutdown.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + --- End diff -- You can add classes from other module's test/ directory as a maven dependency with this mechanism: https://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html > Add more tests for Kafka Connectors > ----------------------------------- > > Key: FLINK-1753 > URL: https://issues.apache.org/jira/browse/FLINK-1753 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.9 > Reporter: Robert Metzger > Assignee: Gábor Hermann > > The current {{KafkaITCase}} is only doing a single test. > We need to refactor that test so that it brings up a Kafka/Zookeeper server > and than performs various tests: > Tests to include: > - A topology with non-string types MERGED IN 359b39c3 > - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3 > - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3 > - Kafka broker failure. > - Flink TaskManager failure -- This message was sent by Atlassian JIRA (v6.3.4#6332)