This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7005a1fa4b6 KAFKA-17433 Add a deflake Github action (#17019)
7005a1fa4b6 is described below

commit 7005a1fa4b6f94cd9ddb0708bfedeafd27266f52
Author: David Arthur <[email protected]>
AuthorDate: Fri Aug 30 23:26:33 2024 -0400

    KAFKA-17433 Add a deflake Github action (#17019)
    
    This patch adds a "deflake" github action which can be used to run a single 
JUnit test or suites. It works by parameterizing the --tests Gradle option. If 
the test extends ClusterTest, the "deflake" workflow can repeat number of times 
by setting the kafka.cluster.test.repeat system property.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .github/README.md                                  |  40 +++++++
 .github/workflows/deflake.yml                      |  76 +++++++++++++
 build.gradle                                       |   3 +
 .../kafka/test/junit/ClusterTestExtensions.java    | 124 +++++++++++++--------
 4 files changed, 197 insertions(+), 46 deletions(-)

diff --git a/.github/README.md b/.github/README.md
new file mode 100644
index 00000000000..ba4159e4614
--- /dev/null
+++ b/.github/README.md
@@ -0,0 +1,40 @@
+# GitHub Actions
+
+## Overview
+
+The entry point for our build is the "CI" workflow which is defined in ci.yml.
+This is used for both PR and trunk builds. The jobs and steps of the workflow
+are defined in build.yml.
+
+## Opting-in to GitHub Actions
+
+To opt-in to the new GitHub actions workflows, simply name your branch with a
+prefix of "gh-". For example, `gh-KAFKA-17433-deflake`
+
+## Disabling Email Notifications
+
+By default, GitHub sends an email for each failed action run. To change this,
+visit https://github.com/settings/notifications and find System -> Actions.
+Here you can change your notification preferences.
+
+## Publishing Build Scans
+
+> This only works for committers (who have ASF accounts on ge.apache.org).
+
+There are two ways committers can have build scans published. The simplest
+way is to push their branches to apache/kafka. This will allow GitHub Actions 
to
+have access to the repository secret needed to publish the scan.
+
+Alternatively, committers create pull requests against their own forks and
+configure their own access key as a repository secret.
+
+Log in to https://ge.apache.org/, click on My Settings and then Access Keys
+
+Generate an Access Key and give it a name like "github-actions". Copy the key
+down somewhere safe.
+
+On your fork of apache/kafka, navigate to Settings -> Security -> Secrets and
+Variables -> Actions. In the Secrets tab, click Create a New Repository Secret.
+The name of the secret should be `GE_ACCESS_TOKEN` and the value should
+be `ge.apache.org=abc123` where "abc123" is substituted for the Access Key you
+previously generated.
diff --git a/.github/workflows/deflake.yml b/.github/workflows/deflake.yml
new file mode 100644
index 00000000000..8cb89e8be8a
--- /dev/null
+++ b/.github/workflows/deflake.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: 'Deflake a test'
+on:
+  workflow_dispatch:
+    inputs:
+      test-module:
+        description: 'Gradle sub-module which contains the test being 
de-flaked. Should be like :core'
+        required: true
+        type: string
+      test-pattern:
+        description: 'Test class to de-flake (must be a ClusterTest). Should 
be like *SomeTest*'
+        required: true
+        type: string
+      test-repeat:
+        description: 'Number of times to invoke the test'
+        required: true
+        type: number
+        default: 1
+      java-version:
+        description: 'Java version to use.'
+        required: true
+        type: string
+        default: '17'
+
+jobs:
+  deflake:
+    runs-on: ubuntu-latest
+    name: Deflake JUnit tests
+    steps:
+      - name: Checkout code
+        uses: actions/checkout@v4
+        with:
+          persist-credentials: false
+      - name: Setup Gradle
+        uses: ./.github/actions/setup-gradle
+        with:
+          java-version: ${{ inputs.java-version }}
+          gradle-cache-read-only: true
+          develocity-access-key: ${{ secrets.GE_ACCESS_TOKEN }}
+      - name: Test
+        timeout-minutes: 60
+        run: |
+          ./gradlew --build-cache --scan --continue \
+          -PtestLoggingEvents=started,passed,skipped,failed \
+          -PignoreFailures=true -PmaxParallelForks=2 \
+          -Pkafka.cluster.test.repeat=${{ inputs.test-repeat }} \
+          ${{ inputs.test-module }}:test --tests ${{ inputs.test-pattern }}
+      - name: Archive JUnit reports
+        if: always()
+        uses: actions/upload-artifact@v4
+        id: junit-upload-artifact
+        with:
+          name: junit-reports-${{ inputs.java-version }}
+          path: |
+            **/build/reports/tests/test/*
+          if-no-files-found: ignore
+      - name: Parse JUnit tests
+        if: always()
+        run: python .github/scripts/junit.py >> $GITHUB_STEP_SUMMARY
+        env:
+          GITHUB_WORKSPACE: ${{ github.workspace }}
+          REPORT_URL: ${{ steps.junit-upload-artifact.outputs.artifact-url }}
diff --git a/build.gradle b/build.gradle
index e4a248ba846..ce655cbbf74 100644
--- a/build.gradle
+++ b/build.gradle
@@ -476,6 +476,9 @@ subprojects {
     maxHeapSize = defaultMaxHeapSize
     jvmArgs = defaultJvmArgs
 
+    // KAFKA-17433 Used by deflake.yml github action to repeat individual tests
+    systemProperty("kafka.cluster.test.repeat", 
project.findProperty("kafka.cluster.test.repeat"))
+
     testLogging {
       events = userTestLoggingEvents ?: testLoggingEvents
       showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java 
b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 99d124796eb..8ffab913334 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -50,6 +50,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -90,8 +91,19 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * will generate two invocations of "someTest" (since ClusterType.Both was 
given). For each invocation, the test class
  * SomeIntegrationTest will be instantiated, lifecycle methods (before/after) 
will be run, and "someTest" will be invoked.
  *
+ * A special system property "kafka.cluster.test.repeat" can be used to cause 
repeated invocation of the tests.
+ *
+ * For example:
+ *
+ * <pre>
+ * ./gradlew -Pkafka.cluster.test.repeat=3 :core:test
+ * </pre>
+ *
+ * will cause all ClusterTest-s in the :core module to be invoked three times.
  */
 public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvider, BeforeEachCallback, AfterEachCallback {
+    public static final String CLUSTER_TEST_REPEAT_SYSTEM_PROP = 
"kafka.cluster.test.repeat";
+
     private static final String METRICS_METER_TICK_THREAD_PREFIX = 
"metrics-meter-tick-thread";
     private static final String SCALA_THREAD_PREFIX = "scala-";
     private static final String FORK_JOIN_POOL_THREAD_PREFIX = "ForkJoinPool";
@@ -124,13 +136,13 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
         // Process single @ClusterTest annotation
         ClusterTest clusterTestAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
         if (clusterTestAnnot != null) {
-            generatedContexts.addAll(processClusterTest(context, 
clusterTestAnnot, defaults));
+            generatedContexts.addAll(processClusterTests(context, new 
ClusterTest[]{clusterTestAnnot}, defaults));
         }
 
         // Process multiple @ClusterTest annotation within @ClusterTests
         ClusterTests clusterTestsAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
         if (clusterTestsAnnot != null) {
-            generatedContexts.addAll(processClusterTests(context, 
clusterTestsAnnot, defaults));
+            generatedContexts.addAll(processClusterTests(context, 
clusterTestsAnnot.value(), defaults));
         }
 
         if (generatedContexts.isEmpty()) {
@@ -163,15 +175,29 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
         return context.getStore(Namespace.create(context.getUniqueId()));
     }
 
+    private int getTestRepeatCount() {
+        int count;
+        try {
+            String repeatCount = 
System.getProperty(CLUSTER_TEST_REPEAT_SYSTEM_PROP, "1");
+            count = Integer.parseInt(repeatCount);
+        } catch (NumberFormatException e) {
+            count = 1;
+        }
+        return count;
+    }
+
     List<TestTemplateInvocationContext> 
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
         if (annot.value().trim().isEmpty()) {
             throw new IllegalStateException("ClusterTemplate value can't be 
empty string.");
         }
 
         String baseDisplayName = context.getRequiredTestMethod().getName();
-        List<TestTemplateInvocationContext> contexts = 
generateClusterConfigurations(context, annot.value())
-                .stream().flatMap(config -> config.clusterTypes().stream()
-                        .map(type -> type.invocationContexts(baseDisplayName, 
config))).collect(Collectors.toList());
+        int repeatCount = getTestRepeatCount();
+        List<TestTemplateInvocationContext> contexts = IntStream.range(0, 
repeatCount)
+            .mapToObj(__ -> generateClusterConfigurations(context, 
annot.value()).stream())
+            .flatMap(Function.identity())
+            .flatMap(config -> config.clusterTypes().stream().map(type -> 
type.invocationContexts(baseDisplayName, config)))
+            .collect(Collectors.toList());
 
         if (contexts.isEmpty()) {
             throw new IllegalStateException("ClusterConfig generator method 
should provide at least one config");
@@ -181,16 +207,26 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
     }
 
     @SuppressWarnings("unchecked")
-    private List<ClusterConfig> generateClusterConfigurations(ExtensionContext 
context, String generateClustersMethods) {
+    private List<ClusterConfig> generateClusterConfigurations(
+        ExtensionContext context,
+        String generateClustersMethods
+    ) {
         Object testInstance = context.getTestInstance().orElse(null);
         Method method = 
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), 
generateClustersMethods);
         return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method, 
testInstance);
     }
 
-    private List<TestTemplateInvocationContext> 
processClusterTests(ExtensionContext context, ClusterTests annots, 
ClusterTestDefaults defaults) {
-
-        List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
-                .flatMap(annot -> processClusterTestInternal(context, annot, 
defaults).stream()).collect(Collectors.toList());
+    private List<TestTemplateInvocationContext> processClusterTests(
+        ExtensionContext context,
+        ClusterTest[] clusterTests,
+        ClusterTestDefaults defaults
+    ) {
+        int repeatCount = getTestRepeatCount();
+        List<TestTemplateInvocationContext> ret = IntStream.range(0, 
repeatCount)
+            .mapToObj(__ -> Arrays.stream(clusterTests))
+            .flatMap(Function.identity())
+            .flatMap(clusterTest -> processClusterTestInternal(context, 
clusterTest, defaults).stream())
+            .collect(Collectors.toList());
 
         if (ret.isEmpty()) {
             throw new IllegalStateException("processClusterTests method should 
provide at least one config");
@@ -199,46 +235,42 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
         return ret;
     }
 
-    private List<TestTemplateInvocationContext> 
processClusterTest(ExtensionContext context, ClusterTest annot, 
ClusterTestDefaults defaults) {
-        List<TestTemplateInvocationContext> ret = 
processClusterTestInternal(context, annot, defaults);
-
-        if (ret.isEmpty()) {
-            throw new IllegalStateException("processClusterTest method should 
provide at least one config");
-        }
-
-        return ret;
-    }
-    private List<TestTemplateInvocationContext> 
processClusterTestInternal(ExtensionContext context, ClusterTest annot, 
ClusterTestDefaults defaults) {
-        Type[] types = annot.types().length == 0 ? defaults.types() : 
annot.types();
-        Map<String, String> serverProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
-                .filter(e -> e.id() == -1)
-                .collect(Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b));
+    private List<TestTemplateInvocationContext> processClusterTestInternal(
+        ExtensionContext context,
+        ClusterTest clusterTest,
+        ClusterTestDefaults defaults
+    ) {
+        Type[] types = clusterTest.types().length == 0 ? defaults.types() : 
clusterTest.types();
+        Map<String, String> serverProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(clusterTest.serverProperties()))
+            .filter(e -> e.id() == -1)
+            .collect(Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b));
 
-        Map<Integer, Map<String, String>> perServerProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(annot.serverProperties()))
-                .filter(e -> e.id() != -1)
-                .collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
-                        Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b))));
+        Map<Integer, Map<String, String>> perServerProperties = 
Stream.concat(Arrays.stream(defaults.serverProperties()), 
Arrays.stream(clusterTest.serverProperties()))
+            .filter(e -> e.id() != -1)
+            .collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
+                Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b))));
 
-        Map<Features, Short> features = Arrays.stream(annot.features())
-                .collect(Collectors.toMap(ClusterFeature::feature, 
ClusterFeature::version));
+        Map<Features, Short> features = Arrays.stream(clusterTest.features())
+            .collect(Collectors.toMap(ClusterFeature::feature, 
ClusterFeature::version));
 
         ClusterConfig config = ClusterConfig.builder()
-                .setTypes(new HashSet<>(Arrays.asList(types)))
-                .setBrokers(annot.brokers() == 0 ? defaults.brokers() : 
annot.brokers())
-                .setControllers(annot.controllers() == 0 ? 
defaults.controllers() : annot.controllers())
-                .setDisksPerBroker(annot.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : annot.disksPerBroker())
-                .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
-                .setListenerName(annot.listener().trim().isEmpty() ? null : 
annot.listener())
-                .setServerProperties(serverProperties)
-                .setPerServerProperties(perServerProperties)
-                .setSecurityProtocol(annot.securityProtocol())
-                .setMetadataVersion(annot.metadataVersion())
-                .setTags(Arrays.asList(annot.tags()))
-                .setFeatures(features)
-                .build();
-
-        return Arrays.stream(types).map(type -> 
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
-                .collect(Collectors.toList());
+            .setTypes(new HashSet<>(Arrays.asList(types)))
+            .setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() : 
clusterTest.brokers())
+            .setControllers(clusterTest.controllers() == 0 ? 
defaults.controllers() : clusterTest.controllers())
+            .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : clusterTest.disksPerBroker())
+            .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
+            .setListenerName(clusterTest.listener().trim().isEmpty() ? null : 
clusterTest.listener())
+            .setServerProperties(serverProperties)
+            .setPerServerProperties(perServerProperties)
+            .setSecurityProtocol(clusterTest.securityProtocol())
+            .setMetadataVersion(clusterTest.metadataVersion())
+            .setTags(Arrays.asList(clusterTest.tags()))
+            .setFeatures(features)
+            .build();
+
+        return Arrays.stream(types)
+            .map(type -> 
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
+            .collect(Collectors.toList());
     }
 
     private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {

Reply via email to