@pnowojski I tried to extend from Kafka011ITCase : 

```java
public class Kafka10ITCase extends Kafka011ITCase
```

but the prepare method : 

```java
@BeforeClass
public static void prepare() throws ClassNotFoundException {
    KafkaProducerTestBase.prepare();    //here
    ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
}
```
will trigger the method call chain : 

```java
protected static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy) throws ClassNotFoundException {

    // dynamically load the implementation for the test
    //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0
    Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
    
    kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);

    LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
```

the `startClusters` is a static class, the `clazz` is the instance of 
KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to 
get a instance of KafkaTestEnvironmentImpl from 1.0.

[ Full content available at: https://github.com/apache/flink/pull/6577 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to