This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7005a1fa4b6 KAFKA-17433 Add a deflake Github action (#17019)
7005a1fa4b6 is described below
commit 7005a1fa4b6f94cd9ddb0708bfedeafd27266f52
Author: David Arthur <[email protected]>
AuthorDate: Fri Aug 30 23:26:33 2024 -0400
KAFKA-17433 Add a deflake Github action (#17019)
This patch adds a "deflake" github action which can be used to run a single
JUnit test or suites. It works by parameterizing the --tests Gradle option. If
the test extends ClusterTest, the "deflake" workflow can repeat number of times
by setting the kafka.cluster.test.repeat system property.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.github/README.md | 40 +++++++
.github/workflows/deflake.yml | 76 +++++++++++++
build.gradle | 3 +
.../kafka/test/junit/ClusterTestExtensions.java | 124 +++++++++++++--------
4 files changed, 197 insertions(+), 46 deletions(-)
diff --git a/.github/README.md b/.github/README.md
new file mode 100644
index 00000000000..ba4159e4614
--- /dev/null
+++ b/.github/README.md
@@ -0,0 +1,40 @@
+# GitHub Actions
+
+## Overview
+
+The entry point for our build is the "CI" workflow which is defined in ci.yml.
+This is used for both PR and trunk builds. The jobs and steps of the workflow
+are defined in build.yml.
+
+## Opting-in to GitHub Actions
+
+To opt-in to the new GitHub actions workflows, simply name your branch with a
+prefix of "gh-". For example, `gh-KAFKA-17433-deflake`
+
+## Disabling Email Notifications
+
+By default, GitHub sends an email for each failed action run. To change this,
+visit https://github.com/settings/notifications and find System -> Actions.
+Here you can change your notification preferences.
+
+## Publishing Build Scans
+
+> This only works for committers (who have ASF accounts on ge.apache.org).
+
+There are two ways committers can have build scans published. The simplest
+way is to push their branches to apache/kafka. This will allow GitHub Actions
to
+have access to the repository secret needed to publish the scan.
+
+Alternatively, committers create pull requests against their own forks and
+configure their own access key as a repository secret.
+
+Log in to https://ge.apache.org/, click on My Settings and then Access Keys
+
+Generate an Access Key and give it a name like "github-actions". Copy the key
+down somewhere safe.
+
+On your fork of apache/kafka, navigate to Settings -> Security -> Secrets and
+Variables -> Actions. In the Secrets tab, click Create a New Repository Secret.
+The name of the secret should be `GE_ACCESS_TOKEN` and the value should
+be `ge.apache.org=abc123` where "abc123" is substituted for the Access Key you
+previously generated.
diff --git a/.github/workflows/deflake.yml b/.github/workflows/deflake.yml
new file mode 100644
index 00000000000..8cb89e8be8a
--- /dev/null
+++ b/.github/workflows/deflake.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: 'Deflake a test'
+on:
+ workflow_dispatch:
+ inputs:
+ test-module:
+ description: 'Gradle sub-module which contains the test being
de-flaked. Should be like :core'
+ required: true
+ type: string
+ test-pattern:
+ description: 'Test class to de-flake (must be a ClusterTest). Should
be like *SomeTest*'
+ required: true
+ type: string
+ test-repeat:
+ description: 'Number of times to invoke the test'
+ required: true
+ type: number
+ default: 1
+ java-version:
+ description: 'Java version to use.'
+ required: true
+ type: string
+ default: '17'
+
+jobs:
+ deflake:
+ runs-on: ubuntu-latest
+ name: Deflake JUnit tests
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+ with:
+ persist-credentials: false
+ - name: Setup Gradle
+ uses: ./.github/actions/setup-gradle
+ with:
+ java-version: ${{ inputs.java-version }}
+ gradle-cache-read-only: true
+ develocity-access-key: ${{ secrets.GE_ACCESS_TOKEN }}
+ - name: Test
+ timeout-minutes: 60
+ run: |
+ ./gradlew --build-cache --scan --continue \
+ -PtestLoggingEvents=started,passed,skipped,failed \
+ -PignoreFailures=true -PmaxParallelForks=2 \
+ -Pkafka.cluster.test.repeat=${{ inputs.test-repeat }} \
+ ${{ inputs.test-module }}:test --tests ${{ inputs.test-pattern }}
+ - name: Archive JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ id: junit-upload-artifact
+ with:
+ name: junit-reports-${{ inputs.java-version }}
+ path: |
+ **/build/reports/tests/test/*
+ if-no-files-found: ignore
+ - name: Parse JUnit tests
+ if: always()
+ run: python .github/scripts/junit.py >> $GITHUB_STEP_SUMMARY
+ env:
+ GITHUB_WORKSPACE: ${{ github.workspace }}
+ REPORT_URL: ${{ steps.junit-upload-artifact.outputs.artifact-url }}
diff --git a/build.gradle b/build.gradle
index e4a248ba846..ce655cbbf74 100644
--- a/build.gradle
+++ b/build.gradle
@@ -476,6 +476,9 @@ subprojects {
maxHeapSize = defaultMaxHeapSize
jvmArgs = defaultJvmArgs
+ // KAFKA-17433 Used by deflake.yml github action to repeat individual tests
+ systemProperty("kafka.cluster.test.repeat",
project.findProperty("kafka.cluster.test.repeat"))
+
testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 99d124796eb..8ffab913334 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -50,6 +50,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -90,8 +91,19 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* will generate two invocations of "someTest" (since ClusterType.Both was
given). For each invocation, the test class
* SomeIntegrationTest will be instantiated, lifecycle methods (before/after)
will be run, and "someTest" will be invoked.
*
+ * A special system property "kafka.cluster.test.repeat" can be used to cause
repeated invocation of the tests.
+ *
+ * For example:
+ *
+ * <pre>
+ * ./gradlew -Pkafka.cluster.test.repeat=3 :core:test
+ * </pre>
+ *
+ * will cause all ClusterTest-s in the :core module to be invoked three times.
*/
public class ClusterTestExtensions implements
TestTemplateInvocationContextProvider, BeforeEachCallback, AfterEachCallback {
+ public static final String CLUSTER_TEST_REPEAT_SYSTEM_PROP =
"kafka.cluster.test.repeat";
+
private static final String METRICS_METER_TICK_THREAD_PREFIX =
"metrics-meter-tick-thread";
private static final String SCALA_THREAD_PREFIX = "scala-";
private static final String FORK_JOIN_POOL_THREAD_PREFIX = "ForkJoinPool";
@@ -124,13 +136,13 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
// Process single @ClusterTest annotation
ClusterTest clusterTestAnnot =
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
if (clusterTestAnnot != null) {
- generatedContexts.addAll(processClusterTest(context,
clusterTestAnnot, defaults));
+ generatedContexts.addAll(processClusterTests(context, new
ClusterTest[]{clusterTestAnnot}, defaults));
}
// Process multiple @ClusterTest annotation within @ClusterTests
ClusterTests clusterTestsAnnot =
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
if (clusterTestsAnnot != null) {
- generatedContexts.addAll(processClusterTests(context,
clusterTestsAnnot, defaults));
+ generatedContexts.addAll(processClusterTests(context,
clusterTestsAnnot.value(), defaults));
}
if (generatedContexts.isEmpty()) {
@@ -163,15 +175,29 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
return context.getStore(Namespace.create(context.getUniqueId()));
}
+ private int getTestRepeatCount() {
+ int count;
+ try {
+ String repeatCount =
System.getProperty(CLUSTER_TEST_REPEAT_SYSTEM_PROP, "1");
+ count = Integer.parseInt(repeatCount);
+ } catch (NumberFormatException e) {
+ count = 1;
+ }
+ return count;
+ }
+
List<TestTemplateInvocationContext>
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be
empty string.");
}
String baseDisplayName = context.getRequiredTestMethod().getName();
- List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(context, annot.value())
- .stream().flatMap(config -> config.clusterTypes().stream()
- .map(type -> type.invocationContexts(baseDisplayName,
config))).collect(Collectors.toList());
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> contexts = IntStream.range(0,
repeatCount)
+ .mapToObj(__ -> generateClusterConfigurations(context,
annot.value()).stream())
+ .flatMap(Function.identity())
+ .flatMap(config -> config.clusterTypes().stream().map(type ->
type.invocationContexts(baseDisplayName, config)))
+ .collect(Collectors.toList());
if (contexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method
should provide at least one config");
@@ -181,16 +207,26 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
}
@SuppressWarnings("unchecked")
- private List<ClusterConfig> generateClusterConfigurations(ExtensionContext
context, String generateClustersMethods) {
+ private List<ClusterConfig> generateClusterConfigurations(
+ ExtensionContext context,
+ String generateClustersMethods
+ ) {
Object testInstance = context.getTestInstance().orElse(null);
Method method =
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(),
generateClustersMethods);
return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method,
testInstance);
}
- private List<TestTemplateInvocationContext>
processClusterTests(ExtensionContext context, ClusterTests annots,
ClusterTestDefaults defaults) {
-
- List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
- .flatMap(annot -> processClusterTestInternal(context, annot,
defaults).stream()).collect(Collectors.toList());
+ private List<TestTemplateInvocationContext> processClusterTests(
+ ExtensionContext context,
+ ClusterTest[] clusterTests,
+ ClusterTestDefaults defaults
+ ) {
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> ret = IntStream.range(0,
repeatCount)
+ .mapToObj(__ -> Arrays.stream(clusterTests))
+ .flatMap(Function.identity())
+ .flatMap(clusterTest -> processClusterTestInternal(context,
clusterTest, defaults).stream())
+ .collect(Collectors.toList());
if (ret.isEmpty()) {
throw new IllegalStateException("processClusterTests method should
provide at least one config");
@@ -199,46 +235,42 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
return ret;
}
- private List<TestTemplateInvocationContext>
processClusterTest(ExtensionContext context, ClusterTest annot,
ClusterTestDefaults defaults) {
- List<TestTemplateInvocationContext> ret =
processClusterTestInternal(context, annot, defaults);
-
- if (ret.isEmpty()) {
- throw new IllegalStateException("processClusterTest method should
provide at least one config");
- }
-
- return ret;
- }
- private List<TestTemplateInvocationContext>
processClusterTestInternal(ExtensionContext context, ClusterTest annot,
ClusterTestDefaults defaults) {
- Type[] types = annot.types().length == 0 ? defaults.types() :
annot.types();
- Map<String, String> serverProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
- .filter(e -> e.id() == -1)
- .collect(Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b));
+ private List<TestTemplateInvocationContext> processClusterTestInternal(
+ ExtensionContext context,
+ ClusterTest clusterTest,
+ ClusterTestDefaults defaults
+ ) {
+ Type[] types = clusterTest.types().length == 0 ? defaults.types() :
clusterTest.types();
+ Map<String, String> serverProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(clusterTest.serverProperties()))
+ .filter(e -> e.id() == -1)
+ .collect(Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b));
- Map<Integer, Map<String, String>> perServerProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
- .filter(e -> e.id() != -1)
- .collect(Collectors.groupingBy(ClusterConfigProperty::id,
Collectors.mapping(Function.identity(),
- Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b))));
+ Map<Integer, Map<String, String>> perServerProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(clusterTest.serverProperties()))
+ .filter(e -> e.id() != -1)
+ .collect(Collectors.groupingBy(ClusterConfigProperty::id,
Collectors.mapping(Function.identity(),
+ Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b))));
- Map<Features, Short> features = Arrays.stream(annot.features())
- .collect(Collectors.toMap(ClusterFeature::feature,
ClusterFeature::version));
+ Map<Features, Short> features = Arrays.stream(clusterTest.features())
+ .collect(Collectors.toMap(ClusterFeature::feature,
ClusterFeature::version));
ClusterConfig config = ClusterConfig.builder()
- .setTypes(new HashSet<>(Arrays.asList(types)))
- .setBrokers(annot.brokers() == 0 ? defaults.brokers() :
annot.brokers())
- .setControllers(annot.controllers() == 0 ?
defaults.controllers() : annot.controllers())
- .setDisksPerBroker(annot.disksPerBroker() == 0 ?
defaults.disksPerBroker() : annot.disksPerBroker())
- .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ?
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
- .setListenerName(annot.listener().trim().isEmpty() ? null :
annot.listener())
- .setServerProperties(serverProperties)
- .setPerServerProperties(perServerProperties)
- .setSecurityProtocol(annot.securityProtocol())
- .setMetadataVersion(annot.metadataVersion())
- .setTags(Arrays.asList(annot.tags()))
- .setFeatures(features)
- .build();
-
- return Arrays.stream(types).map(type ->
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
- .collect(Collectors.toList());
+ .setTypes(new HashSet<>(Arrays.asList(types)))
+ .setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() :
clusterTest.brokers())
+ .setControllers(clusterTest.controllers() == 0 ?
defaults.controllers() : clusterTest.controllers())
+ .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ?
defaults.disksPerBroker() : clusterTest.disksPerBroker())
+ .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ?
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
+ .setListenerName(clusterTest.listener().trim().isEmpty() ? null :
clusterTest.listener())
+ .setServerProperties(serverProperties)
+ .setPerServerProperties(perServerProperties)
+ .setSecurityProtocol(clusterTest.securityProtocol())
+ .setMetadataVersion(clusterTest.metadataVersion())
+ .setTags(Arrays.asList(clusterTest.tags()))
+ .setFeatures(features)
+ .build();
+
+ return Arrays.stream(types)
+ .map(type ->
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
+ .collect(Collectors.toList());
}
private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {