This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a67afb20402040f4c489dad7285308439c4776e8 Author: Fabian Paul <[email protected]> AuthorDate: Tue Nov 30 10:25:56 2021 +0100 [FLINK-15493][test] Add retry rule for all tests based on KafkaTestBase --- .../streaming/connectors/kafka/FlinkKafkaProducerITCase.java | 8 -------- .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java | 9 +++++++++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java index e9217f7..3194d49 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java @@ -28,13 +28,10 @@ import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.testutils.junit.RetryOnFailure; -import org.apache.flink.testutils.junit.RetryRule; import kafka.server.KafkaServer; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; @@ -57,13 +54,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** IT cases for the {@link FlinkKafkaProducer}. */ -// This test is known to be unstable due to a known issue in Kafka. -// It has been solved after bumping Kafka to 2.8.1 on the release 1.15 -@RetryOnFailure(times = 2) public class FlinkKafkaProducerITCase extends KafkaTestBase { - @Rule public final RetryRule retryRule = new RetryRule(); - protected String transactionalId; protected Properties extraProperties; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 9a38990..42d8a61 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.SuccessException; +import org.apache.flink.testutils.junit.RetryOnFailure; +import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -38,6 +40,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,8 +72,12 @@ import static org.junit.Assert.fail; * href="https://github.com/sakserv/hadoop-mini-clusters"> * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed), as per commit * <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i> + * + * <p>Tests inheriting from this class are known to be unstable due to the test setup. All tests + * implemented in subclasses will be retried on failures. */ @SuppressWarnings("serial") +@RetryOnFailure(times = 3) public abstract class KafkaTestBase extends TestLogger { public static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class); @@ -89,6 +96,8 @@ public abstract class KafkaTestBase extends TestLogger { public static Properties secureProps = new Properties(); + @Rule public final RetryRule retryRule = new RetryRule(); + // ------------------------------------------------------------------------ // Setup and teardown of the mini clusters // ------------------------------------------------------------------------
