Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5282#discussion_r168377242
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -1910,86 +1959,171 @@ public void cancel() {
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
- final StreamExecutionEnvironment readEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
-
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- readEnv.getConfig().disableSysoutLogging();
- readEnv.setParallelism(parallelism);
+ if (validateSequence(topicName, parallelism,
deserSchema, numElements)) {
+ // everything is good!
+ return topicName;
+ }
+ else {
+ deleteTestTopic(topicName);
+ // fall through the loop
+ }
+ }
- Properties readProps = (Properties)
standardProps.clone();
- readProps.setProperty("group.id",
"flink-tests-validator");
- readProps.putAll(secureProps);
- FlinkKafkaConsumerBase<Tuple2<Integer, Integer>>
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
+ throw new Exception("Could not write a valid sequence to Kafka
after " + maxNumAttempts + " attempts");
+ }
- readEnv
- .addSource(consumer)
- .map(new
RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+ protected void writeAppendSequence(
+ String topicName,
+ final int originalNumElements,
+ final int numElementsToAppend,
+ final int parallelism) throws Exception {
- private final int totalCount =
parallelism * numElements;
- private int count = 0;
+ LOG.info("\n===================================\n" +
+ "== Appending sequence of " + numElementsToAppend + "
into " + topicName +
+ "===================================");
- @Override
- public Tuple2<Integer, Integer>
map(Tuple2<Integer, Integer> value) throws Exception {
- if (++count ==
totalCount) {
- throw new
SuccessException();
- } else {
- return value;
- }
- }
- }).setParallelism(1)
- .addSink(new
DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
+ final TypeInformation<Tuple2<Integer, Integer>> resultType =
+ TypeInformation.of(new TypeHint<Tuple2<Integer,
Integer>>() {});
- final AtomicReference<Throwable> errorRef = new
AtomicReference<>();
+ final KeyedSerializationSchema<Tuple2<Integer, Integer>>
serSchema =
+ new KeyedSerializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
- Thread runner = new Thread() {
- @Override
- public void run() {
- try {
- tryExecute(readEnv, "sequence
validation");
- } catch (Throwable t) {
- errorRef.set(t);
- }
+ final KeyedDeserializationSchema<Tuple2<Integer, Integer>>
deserSchema =
+ new KeyedDeserializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ // -------- Write the append sequence --------
+
+ StreamExecutionEnvironment writeEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ writeEnv.getConfig().disableSysoutLogging();
+
+ DataStream<Tuple2<Integer, Integer>> stream =
writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>>
ctx) throws Exception {
+ int cnt = originalNumElements;
+ int partition =
getRuntimeContext().getIndexOfThisSubtask();
+
+ while (running && cnt < numElementsToAppend +
originalNumElements) {
+ ctx.collect(new Tuple2<>(partition,
cnt));
+ cnt++;
}
- };
- runner.start();
+ }
- final long deadline = System.nanoTime() +
10_000_000_000L;
- long delay;
- while (runner.isAlive() && (delay = deadline -
System.nanoTime()) > 0) {
- runner.join(delay / 1_000_000L);
+ @Override
+ public void cancel() {
+ running = false;
}
+ }).setParallelism(parallelism);
- boolean success;
+ // the producer must not produce duplicates
+ Properties producerProperties =
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "0");
+ producerProperties.putAll(secureProps);
- if (runner.isAlive()) {
- // did not finish in time, maybe the producer
dropped one or more records and
- // the validation did not reach the exit point
- success = false;
-
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
- }
- else {
- Throwable error = errorRef.get();
- if (error != null) {
- success = false;
- LOG.info("Attempt " + attempt + "
failed with exception", error);
+ kafkaServer.produceIntoKafka(stream, topicName, serSchema,
producerProperties, new Tuple2FlinkPartitioner(parallelism))
+ .setParallelism(parallelism);
+
+ try {
+ writeEnv.execute("Write sequence");
+ }
+ catch (Exception e) {
+ throw new Exception("Failed to append sequence to
Kafka; append job failed.", e);
+ }
+
+ LOG.info("Finished writing append sequence");
+
+ // we need to validate the sequence, because kafka's producers
are not exactly once
+ LOG.info("Validating sequence");
+
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+
+ if (!validateSequence(topicName, parallelism, deserSchema,
originalNumElements + numElementsToAppend)) {
+ throw new Exception("Could not append a valid sequence
to Kafka.");
+ }
+ }
+
+ private boolean validateSequence(
+ final String topic,
+ final int parallelism,
+ KeyedDeserializationSchema<Tuple2<Integer, Integer>>
deserSchema,
+ final int totalNumElements) throws Exception {
+
+ final StreamExecutionEnvironment readEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ readEnv.getConfig().disableSysoutLogging();
+ readEnv.setParallelism(parallelism);
+
+ Properties readProps = (Properties) standardProps.clone();
+ readProps.setProperty("group.id", "flink-tests-validator");
+ readProps.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer =
kafkaServer.getConsumer(topic, deserSchema, readProps);
+ consumer.setStartFromEarliest();
+
+ readEnv
+ .addSource(consumer)
+ .map(new RichMapFunction<Tuple2<Integer, Integer>,
Tuple2<Integer, Integer>>() {
+
+ private final int totalCount = parallelism *
totalNumElements;
+ private int count = 0;
+
+ @Override
+ public Tuple2<Integer, Integer>
map(Tuple2<Integer, Integer> value) throws Exception {
+ System.out.println(count);
--- End diff --
Indeed, will remove.
---