[ 
https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492675#comment-14492675
 ] 

ASF GitHub Bot commented on FLINK-1753:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28257271
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 ---
    @@ -524,13 +537,149 @@ public void cancel() {
                }
        }
     
    +   private static boolean leaderHasShutDown = false;
    +
    +   @Test
    +   public void brokerFailureTest() throws Exception {
    +           String topic = "brokerFailureTestTopic";
    +
    +           createTestTopic(topic, 2, 2);
     
    -   private void createTestTopic(String topic, int numberOfPartitions) {
                KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
    -           kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
    +           final String leaderToShutDown =
    +                           
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
    +
    +           final Thread brokerShutdown = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           shutdownKafkaBroker = false;
    +                           while (!shutdownKafkaBroker) {
    +                                   try {
    +                                           Thread.sleep(10);
    +                                   } catch (InterruptedException e) {
    +                                           LOG.warn("Interruption", e);
    +                                   }
    +                           }
    +
    +                           for (KafkaServer kafkaServer : brokers) {
    +                                   if (leaderToShutDown.equals(
    +                                                   
kafkaServer.config().advertisedHostName()
    +                                                                   + ":"
    +                                                                   + 
kafkaServer.config().advertisedPort()
    +                                   )) {
    +                                           LOG.info("Killing Kafka Server 
{}", leaderToShutDown);
    +                                           kafkaServer.shutdown();
    +                                           leaderHasShutDown = true;
    +                                           break;
    +                                   }
    +                           }
    +                   }
    +           });
    +           brokerShutdown.start();
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
    +
    --- End diff --
    
    You can add classes from other module's test/ directory as a maven 
dependency with this mechanism:
    
https://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html



> Add more tests for Kafka Connectors
> -----------------------------------
>
>                 Key: FLINK-1753
>                 URL: https://issues.apache.org/jira/browse/FLINK-1753
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Gábor Hermann
>
> The current {{KafkaITCase}} is only doing a single test.
> We need to refactor that test so that it brings up a Kafka/Zookeeper server 
> and than performs various tests:
> Tests to include:
> - A topology with non-string types MERGED IN 359b39c3
> - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
> - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
> - Kafka broker failure.
> - Flink TaskManager failure



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to