Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5669#discussion_r177747320
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws
Exception {
final AtomicReference<Throwable> error = new
AtomicReference<>();
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --
Please unify and extract common code between `runCancelingOnFullInputTest`
and `runCancelingOnEmptyInputTest` for example into:
```
private void runCancelingTest(boolean emptyInput) throws Exception {
final String topic = emptyInput ? "cancelingOnEmptyInputTopic"
: "cancelingOnFullTopic";
final int parallelism = 3;
createTestTopic(topic, parallelism, 1);
// launch a producer thread
DataGenerators.InfiniteStringsGenerator generator =
new
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
if (!emptyInput) {
generator.start();
}
// launch a consumer asynchronously
final AtomicReference<Throwable> jobError = new
AtomicReference<>();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
FlinkKafkaConsumerBase<String> source =
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
env.addSource(source).addSink(new DiscardingSink<String>());
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
final Runnable jobRunner = new Runnable() {
@Override
public void run() {
try {
client.setDetached(false);
client.submitJob(jobGraph,
KafkaConsumerTestBase.class.getClassLoader());
}
catch (Throwable t) {
jobError.set(t);
}
}
};
Thread runnerThread = new Thread(jobRunner, "program runner
thread");
runnerThread.start();
// wait a bit before canceling
Thread.sleep(2000);
Throwable failueCause = jobError.get();
if (failueCause != null) {
failueCause.printStackTrace();
Assert.fail("Test failed prematurely with: " +
failueCause.getMessage());
}
// cancel
client.cancel(jobId);
// wait for the program to be done and validate that we failed
with the right exception
runnerThread.join();
assertEquals(JobStatus.CANCELED,
client.getJobStatus(jobId).get());
if (generator.isAlive()) {
generator.shutdown();
generator.join();
}
else if (!emptyInput) {
Throwable t = generator.getError();
if (t != null) {
t.printStackTrace();
fail("Generator failed: " + t.getMessage());
} else {
fail("Generator failed with no exception");
}
}
deleteTestTopic(topic);
}
```
---