This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a67afb20402040f4c489dad7285308439c4776e8
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Nov 30 10:25:56 2021 +0100

    [FLINK-15493][test] Add retry rule for all tests based on KafkaTestBase
---
 .../streaming/connectors/kafka/FlinkKafkaProducerITCase.java     | 8 --------
 .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java   | 9 +++++++++
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index e9217f7..3194d49 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -28,13 +28,10 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.testutils.junit.RetryOnFailure;
-import org.apache.flink.testutils.junit.RetryRule;
 
 import kafka.server.KafkaServer;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -57,13 +54,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /** IT cases for the {@link FlinkKafkaProducer}. */
-// This test is known to be unstable due to a known issue in Kafka.
-// It has been solved after bumping Kafka to 2.8.1 on the release 1.15
-@RetryOnFailure(times = 2)
 public class FlinkKafkaProducerITCase extends KafkaTestBase {
 
-    @Rule public final RetryRule retryRule = new RetryRule();
-
     protected String transactionalId;
     protected Properties extraProperties;
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 9a38990..42d8a61 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
@@ -38,6 +40,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,8 +72,12 @@ import static org.junit.Assert.fail;
  * href="https://github.com/sakserv/hadoop-mini-clusters";>
  * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed), as per 
commit
  * <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i>
+ *
+ * <p>Tests inheriting from this class are known to be unstable due to the 
test setup. All tests
+ * implemented in subclasses will be retried on failures.
  */
 @SuppressWarnings("serial")
+@RetryOnFailure(times = 3)
 public abstract class KafkaTestBase extends TestLogger {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestBase.class);
@@ -89,6 +96,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static Properties secureProps = new Properties();
 
+    @Rule public final RetryRule retryRule = new RetryRule();
+
     // ------------------------------------------------------------------------
     //  Setup and teardown of the mini clusters
     // ------------------------------------------------------------------------

Reply via email to