This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f51123c0803 [improve][ci] Add Netty leak detection reporting to Pulsar
CI (#24272)
f51123c0803 is described below
commit f51123c0803d40ab57db77c73bb4b0ce9aadb587
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 9 15:30:39 2025 +0300
[improve][ci] Add Netty leak detection reporting to Pulsar CI (#24272)
---
.github/workflows/pulsar-ci-flaky.yaml | 30 ++-
.github/workflows/pulsar-ci.yaml | 85 ++++++-
build/pulsar_ci_tool.sh | 62 +++++
buildtools/pom.xml | 9 +-
.../pulsar/tests/ExtendedNettyLeakDetector.java | 258 +++++++++++++++++++++
.../apache/pulsar/tests/PulsarTestListener.java | 34 ++-
conf/bkenv.sh | 2 +-
conf/pulsar_tools_env.sh | 2 +-
.../terraform-ansible/templates/pulsar_env.sh | 2 +-
pom.xml | 54 ++++-
.../PulsarByteBufAllocatorDefaultTest.java | 2 -
...ulsarByteBufAllocatorOomThrowExceptionTest.java | 2 -
tests/bc_2_0_0/pom.xml | 1 -
tests/bc_2_0_1/pom.xml | 1 -
tests/bc_2_6_0/pom.xml | 1 -
tests/docker-images/java-test-image/Dockerfile | 3 +
tests/docker-images/java-test-image/pom.xml | 9 +
.../docker-images/latest-version-image/Dockerfile | 3 +
tests/docker-images/latest-version-image/pom.xml | 9 +
tests/integration/pom.xml | 2 +-
.../tests/integration/containers/BKContainer.java | 5 +
.../tests/integration/containers/CSContainer.java | 5 +
.../integration/containers/ChaosContainer.java | 26 ++-
.../integration/containers/PulsarContainer.java | 34 ++-
.../tests/integration/containers/ZKContainer.java | 6 +
tests/pulsar-client-admin-shade-test/pom.xml | 1 -
tests/pulsar-client-all-shade-test/pom.xml | 1 -
tests/pulsar-client-shade-test/pom.xml | 1 -
28 files changed, 616 insertions(+), 34 deletions(-)
diff --git a/.github/workflows/pulsar-ci-flaky.yaml
b/.github/workflows/pulsar-ci-flaky.yaml
index 0924ecd02cc..e002d007b9c 100644
--- a/.github/workflows/pulsar-ci-flaky.yaml
+++ b/.github/workflows/pulsar-ci-flaky.yaml
@@ -25,9 +25,9 @@ on:
- branch-*
- pulsar-*
schedule:
- # scheduled job with JDK 17
- - cron: '0 12 * * *'
# scheduled job with JDK 21
+ - cron: '0 12 * * *'
+ # scheduled job with JDK 17
# if cron expression is changed, make sure to update the expression in
jdk_major_version step in preconditions job
- cron: '0 6 * * *'
workflow_dispatch:
@@ -61,6 +61,15 @@ on:
required: true
type: number
default: 10000
+ netty_leak_detection:
+ description: 'Controls Netty leak detection. When set to "report",
Netty leak detection is enabled. When set to "fail_on_leak", Netty leak
detection is enabled and a build job will fail if leaks are detected. When set
to "off", Netty leak detection is disabled.'
+ required: true
+ type: choice
+ options:
+ - 'report'
+ - 'fail_on_leak'
+ - 'off'
+ default: 'report'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}${{
github.event_name == 'workflow_dispatch' &&
github.event.inputs.jdk_major_version || '' }}
@@ -84,7 +93,8 @@ jobs:
need_owasp: ${{ steps.changes.outputs.need_owasp }}
collect_coverage: ${{ steps.check_coverage.outputs.collect_coverage }}
jdk_major_version: ${{ steps.jdk_major_version.outputs.jdk_major_version
}}
-
+ java_non_tests: ${{ steps.changes.outputs.java_non_tests }}
+ netty_leak_detection: ${{
steps.netty_leak_detection.outputs.netty_leak_detection }}
steps:
- name: Cancel scheduled jobs in forks by default
if: ${{ github.repository != 'apache/pulsar' && github.event_name ==
'schedule' }}
@@ -136,6 +146,13 @@ jobs:
|| (github.event_name == 'workflow_dispatch' &&
github.event.inputs.collect_coverage == 'true')
}}" >> $GITHUB_OUTPUT
+ - name: Set Netty leak detection mode
+ id: netty_leak_detection
+ run: |
+ echo "netty_leak_detection=${{
+ github.event_name == 'workflow_dispatch' &&
github.event.inputs.netty_leak_detection || 'report'
+ }}" >> $GITHUB_OUTPUT
+
- name: Check if the PR has been approved for testing
if: ${{ steps.check_changes.outputs.docs_only != 'true' &&
github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
env:
@@ -156,6 +173,8 @@ jobs:
TRACE_TEST_RESOURCE_CLEANUP_DIR: ${{ github.workspace
}}/target/trace-test-resource-cleanup
THREAD_LEAK_DETECTOR_WAIT_MILLIS: ${{ github.event_name ==
'workflow_dispatch' && github.event.inputs.thread_leak_detector_wait_millis ||
10000 }}
THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace
}}/target/thread-leak-dumps
+ NETTY_LEAK_DETECTION: "${{
needs.preconditions.outputs.netty_leak_detection }}"
+ NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
runs-on: ubuntu-22.04
timeout-minutes: 100
if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
@@ -226,6 +245,10 @@ jobs:
cat threadleak*.txt | awk '/^Summary:/ {print "::warning::" $0
"\n"; next} {print}'
fi
+ - name: Report detected Netty leaks
+ if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+ run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
- name: Create Jacoco reports
if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }}
continue-on-error: true
@@ -268,6 +291,7 @@ jobs:
/tmp/*.hprof
**/hs_err_*.log
**/core.*
+ ${{ env.NETTY_LEAK_DUMP_DIR }}/*
${{ env.TRACE_TEST_RESOURCE_CLEANUP_DIR }}/*
${{ env.THREAD_LEAK_DETECTOR_DIR }}/*
retention-days: 7
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 9720c72057e..49c31cb56ae 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -61,6 +61,15 @@ on:
required: true
type: number
default: 10000
+ netty_leak_detection:
+ description: 'Controls Netty leak detection. When set to "report",
Netty leak detection is enabled. When set to "fail_on_leak", Netty leak
detection is enabled and a build job will fail if leaks are detected. When set
to "off", Netty leak detection is disabled.'
+ required: true
+ type: choice
+ options:
+ - 'report'
+ - 'fail_on_leak'
+ - 'off'
+ default: 'report'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}${{
github.event_name == 'workflow_dispatch' &&
github.event.inputs.jdk_major_version || '' }}
@@ -85,6 +94,7 @@ jobs:
collect_coverage: ${{ steps.check_coverage.outputs.collect_coverage }}
jdk_major_version: ${{ steps.jdk_major_version.outputs.jdk_major_version
}}
java_non_tests: ${{ steps.changes.outputs.java_non_tests }}
+ netty_leak_detection: ${{
steps.netty_leak_detection.outputs.netty_leak_detection }}
steps:
- name: Cancel scheduled jobs in forks by default
if: ${{ github.repository != 'apache/pulsar' && github.event_name ==
'schedule' }}
@@ -136,6 +146,13 @@ jobs:
|| (github.event_name == 'workflow_dispatch' &&
github.event.inputs.collect_coverage == 'true')
}}" >> $GITHUB_OUTPUT
+ - name: Set Netty leak detection mode
+ id: netty_leak_detection
+ run: |
+ echo "netty_leak_detection=${{
+ github.event_name == 'workflow_dispatch' &&
github.event.inputs.netty_leak_detection || 'report'
+ }}" >> $GITHUB_OUTPUT
+
- name: Check if the PR has been approved for testing
if: ${{ steps.check_changes.outputs.docs_only != 'true' &&
github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
env:
@@ -232,6 +249,8 @@ jobs:
TRACE_TEST_RESOURCE_CLEANUP_DIR: ${{ github.workspace
}}/target/trace-test-resource-cleanup
THREAD_LEAK_DETECTOR_WAIT_MILLIS: ${{ github.event_name ==
'workflow_dispatch' && github.event.inputs.thread_leak_detector_wait_millis ||
10000 }}
THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace
}}/target/thread-leak-dumps
+ NETTY_LEAK_DETECTION: "${{
needs.preconditions.outputs.netty_leak_detection }}"
+ NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
runs-on: ubuntu-22.04
timeout-minutes: ${{ matrix.timeout || 60 }}
needs: ['preconditions', 'build-and-license-check']
@@ -349,6 +368,10 @@ jobs:
cat threadleak*.txt | awk '/^Summary:/ {print "::warning::" $0
"\n"; next} {print}'
fi
+ - name: Report detected Netty leaks
+ if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+ run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
- name: Upload Surefire reports
uses: actions/upload-artifact@v4
if: ${{ !success() || env.TRACE_TEST_RESOURCE_CLEANUP != 'off' }}
@@ -366,6 +389,7 @@ jobs:
/tmp/*.hprof
**/hs_err_*.log
**/core.*
+ ${{ env.NETTY_LEAK_DUMP_DIR }}/*
${{ env.TRACE_TEST_RESOURCE_CLEANUP_DIR }}/*
${{ env.THREAD_LEAK_DETECTOR_DIR }}/*
retention-days: 7
@@ -554,6 +578,8 @@ jobs:
PULSAR_TEST_IMAGE_NAME: apachepulsar/java-test-image:latest
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version
}}
+ NETTY_LEAK_DETECTION: "${{
needs.preconditions.outputs.netty_leak_detection }}"
+ NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
strategy:
fail-fast: false
matrix:
@@ -702,6 +728,10 @@ jobs:
report_paths: 'test-reports/TEST-*.xml'
annotate_only: 'true'
+ - name: Report detected Netty leaks
+ if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+ run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
- name: Upload Surefire reports
uses: actions/upload-artifact@v4
if: ${{ !success() }}
@@ -710,6 +740,19 @@ jobs:
path: surefire-reports
retention-days: 7
+ - name: Upload possible heap dump, core dump or crash files
+ uses: actions/upload-artifact@v4
+ if: ${{ always() }}
+ with:
+ name: Integration-${{ matrix.upload_name || matrix.group }}-dumps
+ path: |
+ /tmp/*.hprof
+ **/hs_err_*.log
+ **/core.*
+ ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+ retention-days: 7
+ if-no-files-found: ignore
+
- name: Upload container logs
uses: actions/upload-artifact@v4
if: ${{ !success() }}
@@ -972,6 +1015,8 @@ jobs:
PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version
}}
+ NETTY_LEAK_DETECTION: "${{
needs.preconditions.outputs.netty_leak_detection }}"
+ NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
strategy:
fail-fast: false
matrix:
@@ -1079,6 +1124,10 @@ jobs:
report_paths: 'test-reports/TEST-*.xml'
annotate_only: 'true'
+ - name: Report detected Netty leaks
+ if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+ run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
- name: Upload container logs
uses: actions/upload-artifact@v4
if: ${{ !success() }}
@@ -1096,6 +1145,19 @@ jobs:
path: surefire-reports
retention-days: 7
+ - name: Upload possible heap dump, core dump or crash files
+ uses: actions/upload-artifact@v4
+ if: ${{ always() }}
+ with:
+ name: System-${{ matrix.group }}-dumps
+ path: |
+ /tmp/*.hprof
+ **/hs_err_*.log
+ **/core.*
+ ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+ retention-days: 7
+ if-no-files-found: ignore
+
- name: Wait for ssh connection when build fails
# ssh access is enabled for builds in own forks
uses: ./.github/actions/ssh-access
@@ -1202,6 +1264,8 @@ jobs:
PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version
}}
+ NETTY_LEAK_DETECTION: "${{
needs.preconditions.outputs.netty_leak_detection }}"
+ NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
strategy:
fail-fast: false
matrix:
@@ -1286,12 +1350,16 @@ jobs:
report_paths: 'test-reports/TEST-*.xml'
annotate_only: 'true'
+ - name: Report detected Netty leaks
+ if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+ run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
- name: Upload container logs
uses: actions/upload-artifact@v4
if: ${{ !success() }}
continue-on-error: true
with:
- name: System-${{ matrix.group }}-container-logs
+ name: Flaky-System-${{ matrix.group }}-container-logs
path: tests/integration/target/container-logs
retention-days: 7
@@ -1299,10 +1367,23 @@ jobs:
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
- name: System-${{ matrix.name }}-surefire-reports
+ name: Flaky-System-${{ matrix.name }}-surefire-reports
path: surefire-reports
retention-days: 7
+ - name: Upload possible heap dump, core dump or crash files
+ uses: actions/upload-artifact@v4
+ if: ${{ always() }}
+ with:
+ name: Flaky-System-${{ matrix.group }}-dumps
+ path: |
+ /tmp/*.hprof
+ **/hs_err_*.log
+ **/core.*
+ ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+ retention-days: 7
+ if-no-files-found: ignore
+
- name: Wait for ssh connection when build fails
# ssh access is enabled for builds in own forks
uses: ./.github/actions/ssh-access
diff --git a/build/pulsar_ci_tool.sh b/build/pulsar_ci_tool.sh
index 3d63f104cd5..ac2b9173af0 100755
--- a/build/pulsar_ci_tool.sh
+++ b/build/pulsar_ci_tool.sh
@@ -579,6 +579,68 @@ ci_create_inttest_coverage_report() {
echo "::endgroup::"
}
+ci_report_netty_leaks() {
+ if [ -z "$NETTY_LEAK_DUMP_DIR" ]; then
+ echo "NETTY_LEAK_DUMP_DIR isn't set"
+ return 0
+ fi
+ local temp_file=$(mktemp -t netty_leak.XXXX)
+
+ # concat all netty_leak_*.txt files in the dump directory to a temp file
+ if [ -d "$NETTY_LEAK_DUMP_DIR" ]; then
+ find "$NETTY_LEAK_DUMP_DIR" -maxdepth 1 -type f -name "netty_leak_*.txt"
-exec cat {} \; >> $temp_file
+ fi
+
+ # check if there are any netty_leak_*.txt files in the container logs
+ local container_logs_dir="tests/integration/target/container-logs"
+ if [ -d "$container_logs_dir" ]; then
+ local container_netty_leak_dump_dir="$NETTY_LEAK_DUMP_DIR/container-logs"
+ mkdir -p "$container_netty_leak_dump_dir"
+ while read -r file; do
+ # example file name
"tests/integration/target/container-logs/ltnizrzm-standalone/var-log-pulsar.tar.gz"
+ # take ltnizrzm-standalone part
+ container_name=$(basename "$(dirname "$file")")
+ target_dir="$container_netty_leak_dump_dir/$container_name"
+ mkdir -p "$target_dir"
+ tar -C "$target_dir" -zxf "$file" --strip-components=1 --wildcards
--wildcards-match-slash '*/netty_leak_*.txt' >/dev/null 2>&1 || true
+ done < <(find "$container_logs_dir" -type f -name "*.tar.gz")
+ # remove all empty directories
+ find "$container_netty_leak_dump_dir" -type d -empty -delete
+ # print all netty_leak_*.txt files in the container logs dump directory to
the temp file
+ if [ -d "$container_netty_leak_dump_dir" ]; then
+ find "$container_netty_leak_dump_dir" -type f -name "netty_leak_*.txt"
-exec cat {} \; >> $temp_file
+ fi
+ fi
+
+ if [ -s $temp_file ]; then
+ local leak_found_log_message
+ if [[ "$NETTY_LEAK_DETECTION" == "fail_on_leak" ]]; then
+ leak_found_log_message="::error::Netty leaks found. Failing the build
since Netty leak detection is set to 'fail_on_leak'."
+ else
+ leak_found_log_message="::warning::Netty leaks found."
+ fi
+ {
+ echo "${leak_found_log_message}"
+ local test_file_locations=$(grep -h -i test $temp_file | grep org.apache
| sed 's/^[[:space:]]*//;s/[[:space:]]*$//;s/^Hint: //' | sort -u || true)
+ if [[ -n "$test_file_locations" ]]; then
+ echo "Test file locations in stack traces:"
+ echo
+ echo "$test_file_locations"
+ fi
+ echo "Details:"
+ cat $temp_file
+ } | tee $NETTY_LEAK_DUMP_DIR/leak_report.txt
+ touch target/netty_leaks_found
+ if [[ "$NETTY_LEAK_DETECTION" == "fail_on_leak" ]]; then
+ exit 1
+ fi
+ else
+ echo "No netty leaks found."
+ touch target/netty_leaks_not_found
+ fi
+ rm $temp_file
+}
+
if [ -z "$1" ]; then
echo "usage: $0 [ci_tool_function_name]"
echo "Available ci tool functions:"
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index bb94e087eec..a2edce1394e 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -162,12 +162,17 @@
</exclusion>
</exclusions>
</dependency>
- <!-- for testing FastThreadLocalStateCleaner -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
b/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
new file mode 100644
index 00000000000..b270a1c8a2f
--- /dev/null
+++
b/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import org.apache.logging.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A custom Netty leak detector used in Pulsar tests that dumps the detected
leaks to a file in a directory that is
+ * configured with the NETTY_LEAK_DUMP_DIR environment variable. This
directory defaults to java.io.tmpdir.
+ * The files will be named netty_leak_YYYYMMDD-HHMMSS.SSS.txt.
+ */
+public class ExtendedNettyLeakDetector<T> extends ResourceLeakDetector<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtendedNettyLeakDetector.class);
+ public static final String NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME
= "io.netty.customResourceLeakDetector";
+
+ private static final File DUMP_DIR =
+ new File(System.getenv().getOrDefault("NETTY_LEAK_DUMP_DIR",
System.getProperty("java.io.tmpdir")));
+ public static final String EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME =
+ ExtendedNettyLeakDetector.class.getName() + ".exitJvmOnLeak";
+ public static final String
SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS_SYSTEM_PROPERTY_NAME =
+ ExtendedNettyLeakDetector.class.getName() +
".sleepAfterGCAndFinalizationMillis";
+ private static final long SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS =
Long.parseLong(System.getProperty(
+ SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS_SYSTEM_PROPERTY_NAME,
"10"));
+ private static boolean exitJvmOnLeak = Boolean.valueOf(
+ System.getProperty(EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME,
"false"));
+ private static final boolean DEFAULT_EXIT_JVM_ON_LEAK = exitJvmOnLeak;
+ public static final String EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME =
+ ExtendedNettyLeakDetector.class.getName() + ".exitJvmDelayMillis";
+ private static final long EXIT_JVM_DELAY_MILLIS =
+
Long.parseLong(System.getProperty(EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME,
"1000"));
+ public static final String USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME =
+ ExtendedNettyLeakDetector.class.getName() + ".useShutdownHook";
+ private static boolean useShutdownHook = Boolean.valueOf(
+ System.getProperty(USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME,
"false"));
+ static {
+ maybeRegisterShutdownHook();
+ }
+
+ private boolean exitThreadStarted;
+ private static volatile String initialHint;
+
+ /**
+ * Triggers Netty leak detection.
+ * Passing "-XX:+UnlockExperimentalVMOptions -XX:ReferencesPerThread=0" to
the JVM will help to detect leaks
+ * with a shorter latency. When ReferencesPerThread is set to 0, the JVM
will use maximum parallelism for processing
+ * reference objects. Netty's leak detection relies on WeakReferences and
this setting will help to process them
+ * faster.
+ */
+ public static void triggerLeakDetection() {
+ if (!isEnabled()) {
+ return;
+ }
+ // run System.gc() to trigger finalization of objects and detection of
possible Netty leaks
+ System.gc();
+ System.runFinalization();
+ try {
+ // wait for WeakReference collection to complete
+ Thread.sleep(SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ triggerLeakReporting();
+ }
+
+ private static void triggerLeakReporting() {
+ // create a ByteBuf and release it to trigger leak detection
+ // this calls ResourceLeakDetector's reportLeak method when paranoid
is enabled.
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
+ // this triggers leak detection which forces tracking even when
paranoid isn't enabled
+ // check the source code of ResourceLeakDetector to see how it works
or step through it in a debugger
+ // This will call leak detector's trackForcibly method which will then
call ResourceLeakDetector's reportLeak
+ // trackForcibly gets called in
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived
+ ByteBuf retainedSlice = buffer.retainedSlice();
+ retainedSlice.release();
+ buffer.release();
+ }
+
+ public ExtendedNettyLeakDetector(Class<?> resourceType, int
samplingInterval, long maxActive) {
+ super(resourceType, samplingInterval, maxActive);
+ }
+
+ public ExtendedNettyLeakDetector(Class<?> resourceType, int
samplingInterval) {
+ super(resourceType, samplingInterval);
+ }
+
+ public ExtendedNettyLeakDetector(String resourceType, int
samplingInterval, long maxActive) {
+ super(resourceType, samplingInterval, maxActive);
+ }
+
+ /**
+ * Set the initial hint to be used when reporting leaks.
+ * This hint will be printed alongside the stack trace of the creation of
the resource in the Netty leak report.
+ *
+ * @see ResourceLeakDetector#getInitialHint(String)
+ * @param initialHint the initial hint
+ */
+ public static void setInitialHint(String initialHint) {
+ ExtendedNettyLeakDetector.initialHint = initialHint;
+ }
+
+ @Override
+ protected boolean needReport() {
+ return true;
+ }
+
+ @Override
+ protected Object getInitialHint(String resourceType) {
+ String currentInitialHint = ExtendedNettyLeakDetector.initialHint;
+ if (currentInitialHint != null) {
+ return currentInitialHint;
+ } else {
+ return super.getInitialHint(resourceType);
+ }
+ }
+
+ @Override
+ protected void reportTracedLeak(String resourceType, String records) {
+ super.reportTracedLeak(resourceType, records);
+ dumpToFile(resourceType, records);
+ maybeExitJVM();
+ }
+
+ @Override
+ protected void reportUntracedLeak(String resourceType) {
+ super.reportUntracedLeak(resourceType);
+ dumpToFile(resourceType, null);
+ maybeExitJVM();
+ }
+
+ private synchronized void maybeExitJVM() {
+ if (exitThreadStarted) {
+ return;
+ }
+ if (exitJvmOnLeak) {
+ new Thread(() -> {
+ LOG.error("Exiting JVM due to Netty resource leak. Check logs
for more details. Dumped to {}",
+ DUMP_DIR.getAbsolutePath());
+ System.err.println("Exiting JVM due to Netty resource leak.
Check logs for more details. Dumped to "
+ + DUMP_DIR.getAbsolutePath());
+ triggerLeakDetectionBeforeJVMExit();
+ // shutdown log4j2 logging to prevent log truncation
+ LogManager.shutdown();
+ // flush log buffers
+ System.err.flush();
+ System.out.flush();
+ // exit JVM immediately
+ Runtime.getRuntime().halt(1);
+ }).start();
+ exitThreadStarted = true;
+ }
+ }
+
+ private void dumpToFile(String resourceType, String records) {
+ try {
+ if (!DUMP_DIR.exists()) {
+ DUMP_DIR.mkdirs();
+ }
+ String datetimePart =
+
DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss.SSS").format(ZonedDateTime.now());
+ File nettyLeakDumpFile =
+ new File(DUMP_DIR, "netty_leak_" + datetimePart + ".txt");
+ // Prefix the line to make it easier to show the errors in GitHub
Actions as annotations
+ String linePrefix = exitJvmOnLeak ? "::error::" : "::warning::";
+ try (PrintWriter out = new PrintWriter(new
FileWriter(nettyLeakDumpFile, true))) {
+ out.print(linePrefix);
+ if (records != null) {
+ out.println("Traced leak detected " + resourceType);
+ out.println(records);
+ } else {
+ out.println("Untraced leak detected " + resourceType);
+ }
+ out.println();
+ out.flush();
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot write thread leak dump", e);
+ }
+ }
+
+ public static boolean isExtendedNettyLeakDetectorEnabled() {
+ return ExtendedNettyLeakDetector.class.getName()
+
.equals(System.getProperty(NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME));
+ }
+
+ /**
+ * Disable exit on leak. This is useful when exitJvmOnLeak is enabled and
there's a test that is expected
+ * to leak resources. This method can be called before the test execution
begins.
+ * This will not disable the leak detection itself, only the exit on leak
behavior.
+ */
+ public static void disableExitJVMOnLeak() {
+ exitJvmOnLeak = false;
+ }
+
+ /**
+ * Restore exit on leak to original value. This is used to re-enable
exitJvmOnLeak feature after it was
+ * disabled for the duration of a specific test using disableExitJVMOnLeak.
+ */
+ public static void restoreExitJVMOnLeak() {
+ triggerLeakDetection();
+ exitJvmOnLeak = DEFAULT_EXIT_JVM_ON_LEAK;
+ }
+
+ /**
+ * Shutdown hook to trigger leak detection on JVM shutdown.
+ * This is useful when using the leak detector in actual production code
or in system tests which
+ * don't use don't have a test listener that would be calling
triggerLeakDetection before the JVM exits.
+ */
+ private static void maybeRegisterShutdownHook() {
+ if (!exitJvmOnLeak && useShutdownHook) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (!isEnabled()) {
+ return;
+ }
+ triggerLeakDetectionBeforeJVMExit();
+ }, ExtendedNettyLeakDetector.class.getSimpleName() +
"ShutdownHook"));
+ }
+ }
+
+ private static void triggerLeakDetectionBeforeJVMExit() {
+ triggerLeakDetection();
+ // wait for a while
+ try {
+ Thread.sleep(EXIT_JVM_DELAY_MILLIS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // trigger leak detection again to increase the chances of detecting
leaks
+ // this could be helpful if more objects were finalized asynchronously
during the delay
+ triggerLeakDetection();
+ }
+}
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
index 2d1f1273272..cabfdf30069 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
@@ -19,16 +19,21 @@
package org.apache.pulsar.tests;
import java.util.Arrays;
+import org.testng.IExecutionListener;
+import org.testng.ISuite;
+import org.testng.ISuiteListener;
import org.testng.ITestContext;
import org.testng.ITestListener;
import org.testng.ITestResult;
import org.testng.SkipException;
import org.testng.internal.thread.ThreadTimeoutException;
-public class PulsarTestListener implements ITestListener {
+public class PulsarTestListener implements ITestListener, IExecutionListener,
ISuiteListener {
@Override
public void onTestStart(ITestResult result) {
+ ExtendedNettyLeakDetector.setInitialHint(String.format("Test: %s.%s",
result.getTestClass().getName(),
+ result.getMethod().getMethodName()));
System.out.format("------- Starting test %s.%s(%s)-------\n",
result.getTestClass(),
result.getMethod().getMethodName(),
Arrays.toString(result.getParameters()));
}
@@ -37,6 +42,7 @@ public class PulsarTestListener implements ITestListener {
public void onTestSuccess(ITestResult result) {
System.out.format("------- SUCCESS -- %s.%s(%s)-------\n",
result.getTestClass(),
result.getMethod().getMethodName(),
Arrays.toString(result.getParameters()));
+ ExtendedNettyLeakDetector.triggerLeakDetection();
}
@Override
@@ -52,6 +58,7 @@ public class PulsarTestListener implements ITestListener {
}
}
}
+ ExtendedNettyLeakDetector.triggerLeakDetection();
}
@Override
@@ -71,15 +78,36 @@ public class PulsarTestListener implements ITestListener {
@Override
public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
-
+ ExtendedNettyLeakDetector.triggerLeakDetection();
}
@Override
public void onStart(ITestContext context) {
-
+ ExtendedNettyLeakDetector.setInitialHint("Starting test: " +
context.getName());
}
@Override
public void onFinish(ITestContext context) {
+ ExtendedNettyLeakDetector.triggerLeakDetection();
+ ExtendedNettyLeakDetector.setInitialHint("Finished test: " +
context.getName());
+ }
+
+ @Override
+ public void onFinish(ISuite suite) {
+ ExtendedNettyLeakDetector.setInitialHint("Finished suite: " +
suite.getName());
+ }
+
+ @Override
+ public void onExecutionFinish() {
+ if (!ExtendedNettyLeakDetector.isEnabled()) {
+ return;
+ }
+ ExtendedNettyLeakDetector.triggerLeakDetection();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ ExtendedNettyLeakDetector.triggerLeakDetection();
}
}
diff --git a/conf/bkenv.sh b/conf/bkenv.sh
index 6b7cfe6d2cb..5e8c2572a58 100644
--- a/conf/bkenv.sh
+++ b/conf/bkenv.sh
@@ -89,7 +89,7 @@ if [[ -z "$BOOKIE_GC_LOG" ]]; then
fi
# Extra options to be passed to the jvm
-BOOKIE_EXTRA_OPTS="${BOOKIE_EXTRA_OPTS:-"-Dio.netty.leakDetectionLevel=disabled
${PULSAR_EXTRA_OPTS:-"-Dio.netty.recycler.maxCapacityPerThread=4096"}"}"
+BOOKIE_EXTRA_OPTS="${BOOKIE_EXTRA_OPTS:-"-Dio.netty.leakDetection.level=disabled
${PULSAR_EXTRA_OPTS:-"-Dio.netty.recycler.maxCapacityPerThread=4096"}"}"
# Add extra paths to the bookkeeper classpath
# BOOKIE_EXTRA_CLASSPATH=
diff --git a/conf/pulsar_tools_env.sh b/conf/pulsar_tools_env.sh
index 96ee304bf0b..6020e0a863a 100755
--- a/conf/pulsar_tools_env.sh
+++ b/conf/pulsar_tools_env.sh
@@ -58,7 +58,7 @@ fi
PULSAR_MEM=${PULSAR_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}
# Extra options to be passed to the jvm
-PULSAR_EXTRA_OPTS="${PULSAR_MEM} ${PULSAR_GC} ${PULSAR_GC_LOG}
-Dio.netty.leakDetectionLevel=disabled ${PULSAR_EXTRA_OPTS}"
+PULSAR_EXTRA_OPTS="${PULSAR_MEM} ${PULSAR_GC} ${PULSAR_GC_LOG}
-Dio.netty.leakDetection.level=disabled ${PULSAR_EXTRA_OPTS}"
# Add extra paths to the bookkeeper classpath
# PULSAR_EXTRA_CLASSPATH=
diff --git a/deployment/terraform-ansible/templates/pulsar_env.sh
b/deployment/terraform-ansible/templates/pulsar_env.sh
index 020b5d5ac3a..2638718ee55 100644
--- a/deployment/terraform-ansible/templates/pulsar_env.sh
+++ b/deployment/terraform-ansible/templates/pulsar_env.sh
@@ -48,7 +48,7 @@ PULSAR_MEM=" -Xms{{ max_heap_memory }} -Xmx{{ max_heap_memory
}} -XX:MaxDirectMe
PULSAR_GC=" -XX:+UseZGC -XX:+PerfDisableSharedMem -XX:+AlwaysPreTouch"
# Extra options to be passed to the jvm
-PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} -Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.maxCapacityPerThread=4096"
+PULSAR_EXTRA_OPTS="-Dio.netty.leakDetection.level=disabled
-Dio.netty.recycler.maxCapacityPerThread=4096 ${PULSAR_EXTRA_OPTS}"
# Add extra paths to the bookkeeper classpath
# PULSAR_EXTRA_CLASSPATH=
diff --git a/pom.xml b/pom.xml
index f04c89e596c..d6b6fbc23c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,8 +117,40 @@ flexible messaging model and an intuitive client
API.</description>
-XX:+EnableDynamicAgentLoading <!-- byte-buddy-agent and mockito-core
agent dynamic loading -->
-Xshare:off <!-- suppress sharing warning -->
${test.additional.args.jdk24}
+ ${test.netty.args}
</test.additional.args>
<test.additional.args.jdk24></test.additional.args.jdk24>
+ <!-- Netty related JVM args, including Netty leak detection enabled in
tests -->
+ <test.netty.args>
+ <!-- required for triggering GC to trigger leak detection synchronously
in tests -->
+ -XX:+UnlockExperimentalVMOptions -XX:ReferencesPerThread=0
-XX:+ParallelRefProcEnabled
+ <!--
+ enable optimized Netty direct memory buffer access
+
https://pulsar.apache.org/docs/4.0.x/client-libraries-java-setup/#enabling-optimized-netty-direct-memory-buffer-access
+ -->
+ --add-opens java.base/java.nio=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
+ -Dio.netty.tryReflectionSetAccessible=true
+ -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true
+ -Dpulsar.allocator.pooled=true
+ -Dpulsar.allocator.exit_on_oom=false
+ -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
+ ${test.netty.leakdetection.args}
+ </test.netty.args>
+ <test.netty.leakdetection.args>
+ <!-- use the custom resource leak detector in buildtools module -->
+
-Dio.netty.customResourceLeakDetector=org.apache.pulsar.tests.ExtendedNettyLeakDetector
+
-Dorg.apache.pulsar.tests.ExtendedNettyLeakDetector.exitJvmOnLeak=${testExitJvmOnLeak}
+
-Dorg.apache.pulsar.tests.ExtendedNettyLeakDetector.exitJvmDelayMillis=${testExitJvmOnLeakDelayMillis}
+ -Dio.netty.leakDetection.level=${testLeakDetectionLevel}
+ <!-- allows using paranoid leak detection with relatively low overhead
-->
+ -Dio.netty.leakDetection.targetRecords=16
+ -Dio.netty.leakDetection.acquireAndReleaseOnly=true
+ -Dio.netty.leakDetection.samplingInterval=32
+ </test.netty.leakdetection.args>
+ <testExitJvmOnLeak>false</testExitJvmOnLeak>
+ <testExitJvmOnLeakDelayMillis>1000</testExitJvmOnLeakDelayMillis>
+ <testLeakDetectionLevel>paranoid</testLeakDetectionLevel>
<testMaxHeapSize>1300M</testMaxHeapSize>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
@@ -1833,12 +1865,8 @@ flexible messaging model and an intuitive client
API.</description>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError
-Xmx${testMaxHeapSize} -XX:+UseZGC
- -Dpulsar.allocator.pooled=true
- -Dpulsar.allocator.leak_detection=Advanced
- -Dpulsar.allocator.exit_on_oom=false
- -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
- -Dio.netty.tryReflectionSetAccessible=true
+ <argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${testHeapDumpPath}
+ -XX:+ExitOnOutOfMemoryError -Xmx${testMaxHeapSize} -XX:+UseZGC
${test.additional.args}
</argLine>
<reuseForks>${testReuseFork}</reuseForks>
@@ -2943,6 +2971,20 @@ flexible messaging model and an intuitive client
API.</description>
</properties>
</profile>
+ <!-- This profile is used to disable Netty leak detection in tests. It is
activated by setting the
+ environment variable NETTY_LEAK_DETECTION to off. -->
+ <profile>
+ <id>disableNettyLeakDetection</id>
+ <activation>
+ <property>
+ <name>env.NETTY_LEAK_DETECTION</name>
+ <value>off</value>
+ </property>
+ </activation>
+ <properties>
+ <test.netty.leakdetection.args></test.netty.leakdetection.args>
+ </properties>
+ </profile>
</profiles>
<repositories>
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
index eb31d835ec1..961c6f0176d 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
@@ -25,7 +25,6 @@ import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
@@ -48,7 +47,6 @@ public class PulsarByteBufAllocatorDefaultTest {
assertTrue(arguments.get(0) instanceof ByteBufAllocator);
assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
assertEquals(arguments.get(4), OutOfMemoryPolicy.FallbackToHeap);
- assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
})) {
assertFalse(called.get());
PulsarByteBufAllocator.createByteBufAllocator();
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
index 3f1dbc29bd9..9bb9ea1eafe 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
@@ -25,7 +25,6 @@ import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
@@ -49,7 +48,6 @@ public class PulsarByteBufAllocatorOomThrowExceptionTest {
assertTrue(arguments.get(0) instanceof ByteBufAllocator);
assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
assertEquals(arguments.get(4),
OutOfMemoryPolicy.ThrowException);
- assertEquals(arguments.get(6),
LeakDetectionPolicy.Advanced);
})) {
assertFalse(called.get());
PulsarByteBufAllocator.createByteBufAllocator();
diff --git a/tests/bc_2_0_0/pom.xml b/tests/bc_2_0_0/pom.xml
index bf7eced9373..89930ec50c0 100644
--- a/tests/bc_2_0_0/pom.xml
+++ b/tests/bc_2_0_0/pom.xml
@@ -92,7 +92,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>
diff --git a/tests/bc_2_0_1/pom.xml b/tests/bc_2_0_1/pom.xml
index c8951b30dc2..6e4cd8c4c0b 100644
--- a/tests/bc_2_0_1/pom.xml
+++ b/tests/bc_2_0_1/pom.xml
@@ -92,7 +92,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>
diff --git a/tests/bc_2_6_0/pom.xml b/tests/bc_2_6_0/pom.xml
index a563c913e92..44b658f2549 100644
--- a/tests/bc_2_6_0/pom.xml
+++ b/tests/bc_2_6_0/pom.xml
@@ -99,7 +99,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>
diff --git a/tests/docker-images/java-test-image/Dockerfile
b/tests/docker-images/java-test-image/Dockerfile
index 805f20a0570..9cf45130d02 100644
--- a/tests/docker-images/java-test-image/Dockerfile
+++ b/tests/docker-images/java-test-image/Dockerfile
@@ -39,3 +39,6 @@ RUN mv /etc/supervisord/conf.d/supervisord.conf
/etc/supervisord.conf
COPY target/certificate-authority /pulsar/certificate-authority/
COPY target/java-test-functions.jar /pulsar/examples/
+
+# copy buildtools.jar to /pulsar/lib so that
org.apache.pulsar.tests.ExtendedNettyLeakDetector can be used
+COPY target/buildtools.jar /pulsar/lib/
\ No newline at end of file
diff --git a/tests/docker-images/java-test-image/pom.xml
b/tests/docker-images/java-test-image/pom.xml
index cec56c2f237..3d265d37dba 100644
--- a/tests/docker-images/java-test-image/pom.xml
+++ b/tests/docker-images/java-test-image/pom.xml
@@ -91,6 +91,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-server-distribution-bin.tar.gz</destFileName>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>buildtools</artifactId>
+ <version>${project.parent.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+
<outputDirectory>${project.build.directory}</outputDirectory>
+ <destFileName>buildtools.jar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index 46271e4cb54..074f411b952 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -64,6 +64,9 @@ RUN chmod g+rx /pulsar/examples/python-examples/
# copy java test examples
COPY target/java-test-functions.jar /pulsar/examples/
+# copy buildtools.jar to /pulsar/lib so that
org.apache.pulsar.tests.ExtendedNettyLeakDetector can be used
+COPY target/buildtools.jar /pulsar/lib/
+
# copy go test examples
COPY --from=pulsar-function-go /go/bin /pulsar/examples/go-examples
diff --git a/tests/docker-images/latest-version-image/pom.xml
b/tests/docker-images/latest-version-image/pom.xml
index 6c538f17b7f..29108e529b7 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -94,6 +94,15 @@
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
<destFileName>java-test-plugins.nar</destFileName>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>buildtools</artifactId>
+ <version>${project.parent.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+
<outputDirectory>${project.build.directory}</outputDirectory>
+ <destFileName>buildtools.jar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 8bb2ce7c7f9..9cab897cdb3 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -302,7 +302,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument} -XX:+ExitOnOutOfMemoryError
-Xmx1G -XX:MaxDirectMemorySize=1G
- -Dio.netty.leakDetectionLevel=advanced
-Dconfluent.version=${confluent.version}
-Djacoco.version=${jacoco-maven-plugin.version}
+ -Dconfluent.version=${confluent.version}
-Djacoco.version=${jacoco-maven-plugin.version}
-Dintegrationtest.coverage.enabled=${integrationtest.coverage.enabled}
-Dintegrationtest.coverage.dir=${integrationtest.coverage.dir}
${test.additional.args}
</argLine>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
index 9ff48b2e6a1..64806a1f9ab 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
@@ -30,4 +30,9 @@ public class BKContainer extends PulsarContainer<BKContainer>
{
clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT,
INVALID_PORT);
tailContainerLog();
}
+
+ @Override
+ protected boolean isPassNettyLeakDetectionSystemProperties() {
+ return false;
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
index de67d567233..20c7a79dc44 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
@@ -39,4 +39,9 @@ public class CSContainer extends PulsarContainer<CSContainer>
{
protected boolean isCodeCoverageEnabled() {
return false;
}
+
+ @Override
+ protected boolean isPassNettyLeakDetectionSystemProperties() {
+ return false;
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index eb0acf33a89..59af8b54b9a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -19,12 +19,11 @@
package org.apache.pulsar.tests.integration.containers;
import com.github.dockerjava.api.DockerClient;
-
import java.util.Base64;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.testcontainers.containers.GenericContainer;
@@ -48,6 +47,29 @@ public class ChaosContainer<SelfT extends
ChaosContainer<SelfT>> extends Generic
addEnv("MALLOC_ARENA_MAX", "1");
}
+ protected void appendToEnv(String key, String value) {
+ String existingValue = getEnvMap().get(key);
+ if (existingValue == null) {
+ addEnv(key, value);
+ } else {
+ addEnv(key, existingValue + " " + value);
+ }
+ }
+
+ protected void passSystemPropertyInEnv(String envKey, String
systemPropertyName) {
+ passSystemPropertyInEnv(envKey, systemPropertyName,
System.getProperty(systemPropertyName));
+ }
+
+ protected void passSystemPropertyInEnv(String envKey, String
systemPropertyName, String systemPropertyValue) {
+ if (systemPropertyValue != null) {
+ String entryValue = "-D" + systemPropertyName + "=" +
systemPropertyValue;
+ if (StringUtils.containsWhitespace(systemPropertyValue)) {
+ entryValue = "\"" + entryValue + "\"";
+ }
+ appendToEnv(envKey, entryValue);
+ }
+ }
+
protected void beforeStop() {
if (null == getContainerId()) {
return;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 3cdb048aea5..104590045ce 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.containers;
import static java.time.temporal.ChronoUnit.SECONDS;
-
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -29,6 +28,7 @@ import java.util.UUID;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.tests.ExtendedNettyLeakDetector;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.testcontainers.containers.BindMode;
@@ -252,12 +252,42 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
configureCodeCoverage();
}
+ if (isPassNettyLeakDetectionSystemProperties()) {
+ passNettyLeakDetectionSystemProperties();
+ }
+
beforeStart();
super.start();
afterStart();
log.info("[{}] Start pulsar service {} at container {}",
getContainerName(), serviceName, getContainerId());
}
+ protected boolean isPassNettyLeakDetectionSystemProperties() {
+ return true;
+ }
+
+ protected void passNettyLeakDetectionSystemProperties() {
+ if (isPassNettyLeakDetectionSystemProperties()) {
+ String envKey = "PULSAR_EXTRA_OPTS";
+ // pass similar defaults as there is in conf/pulsar_env.sh
+ appendToEnv("PULSAR_EXTRA_OPTS",
+ "-Dpulsar.allocator.exit_on_oom=true
-Dio.netty.recycler.maxCapacityPerThread=4096");
+ passSystemPropertyInEnv(envKey,
ExtendedNettyLeakDetector.NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME);
+ if
(ExtendedNettyLeakDetector.isExtendedNettyLeakDetectorEnabled()) {
+ // enable shutdown hook for extended leak detector in
containers
+ passSystemPropertyInEnv(envKey,
ExtendedNettyLeakDetector.USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME,
+ "true");
+ }
+ passSystemPropertyInEnv(envKey,
ExtendedNettyLeakDetector.EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME);
+ passSystemPropertyInEnv(envKey,
ExtendedNettyLeakDetector.EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME);
+ passSystemPropertyInEnv(envKey, "io.netty.leakDetection.level");
+ passSystemPropertyInEnv(envKey,
"io.netty.leakDetection.targetRecords");
+ passSystemPropertyInEnv(envKey,
"io.netty.leakDetection.samplingInterval");
+ passSystemPropertyInEnv(envKey,
"io.netty.leakDetection.acquireAndReleaseOnly");
+ addEnv("NETTY_LEAK_DUMP_DIR", "/var/log/pulsar");
+ }
+ }
+
protected boolean isCodeCoverageEnabled() {
return Boolean.getBoolean("integrationtest.coverage.enabled");
}
@@ -286,7 +316,7 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- withEnv("OPTS", "-javaagent:/jacocoDir/" + jacocoAgentJar.getName()
+ appendToEnv("OPTS", "-javaagent:/jacocoDir/" +
jacocoAgentJar.getName()
+ "=destfile=/jacocoDir/jacoco_" + getContainerName() +
"_" + System.currentTimeMillis() + ".exec"
+
",includes=org.apache.pulsar.*:org.apache.bookkeeper.mledger.*"
+ ",excludes=*.proto.*:*.shade.*:*.shaded.*");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
index c55eb3242b4..2a372475120 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
@@ -39,4 +39,10 @@ public class ZKContainer extends
PulsarContainer<ZKContainer> {
protected boolean isCodeCoverageEnabled() {
return false;
}
+
+
+ @Override
+ protected boolean isPassNettyLeakDetectionSystemProperties() {
+ return false;
+ }
}
diff --git a/tests/pulsar-client-admin-shade-test/pom.xml
b/tests/pulsar-client-admin-shade-test/pom.xml
index 1ce482ad232..f48b840f1ca 100644
--- a/tests/pulsar-client-admin-shade-test/pom.xml
+++ b/tests/pulsar-client-admin-shade-test/pom.xml
@@ -107,7 +107,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>
diff --git a/tests/pulsar-client-all-shade-test/pom.xml
b/tests/pulsar-client-all-shade-test/pom.xml
index ce2705bb3c9..2e11e8fe53d 100644
--- a/tests/pulsar-client-all-shade-test/pom.xml
+++ b/tests/pulsar-client-all-shade-test/pom.xml
@@ -106,7 +106,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>
diff --git a/tests/pulsar-client-shade-test/pom.xml
b/tests/pulsar-client-shade-test/pom.xml
index 7a5746c7861..49ba7c38494 100644
--- a/tests/pulsar-client-shade-test/pom.xml
+++ b/tests/pulsar-client-shade-test/pom.xml
@@ -101,7 +101,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument}
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
${test.additional.args}
</argLine>
<skipTests>false</skipTests>