This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f51123c0803 [improve][ci] Add Netty leak detection reporting to Pulsar 
CI (#24272)
f51123c0803 is described below

commit f51123c0803d40ab57db77c73bb4b0ce9aadb587
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 9 15:30:39 2025 +0300

    [improve][ci] Add Netty leak detection reporting to Pulsar CI (#24272)
---
 .github/workflows/pulsar-ci-flaky.yaml             |  30 ++-
 .github/workflows/pulsar-ci.yaml                   |  85 ++++++-
 build/pulsar_ci_tool.sh                            |  62 +++++
 buildtools/pom.xml                                 |   9 +-
 .../pulsar/tests/ExtendedNettyLeakDetector.java    | 258 +++++++++++++++++++++
 .../apache/pulsar/tests/PulsarTestListener.java    |  34 ++-
 conf/bkenv.sh                                      |   2 +-
 conf/pulsar_tools_env.sh                           |   2 +-
 .../terraform-ansible/templates/pulsar_env.sh      |   2 +-
 pom.xml                                            |  54 ++++-
 .../PulsarByteBufAllocatorDefaultTest.java         |   2 -
 ...ulsarByteBufAllocatorOomThrowExceptionTest.java |   2 -
 tests/bc_2_0_0/pom.xml                             |   1 -
 tests/bc_2_0_1/pom.xml                             |   1 -
 tests/bc_2_6_0/pom.xml                             |   1 -
 tests/docker-images/java-test-image/Dockerfile     |   3 +
 tests/docker-images/java-test-image/pom.xml        |   9 +
 .../docker-images/latest-version-image/Dockerfile  |   3 +
 tests/docker-images/latest-version-image/pom.xml   |   9 +
 tests/integration/pom.xml                          |   2 +-
 .../tests/integration/containers/BKContainer.java  |   5 +
 .../tests/integration/containers/CSContainer.java  |   5 +
 .../integration/containers/ChaosContainer.java     |  26 ++-
 .../integration/containers/PulsarContainer.java    |  34 ++-
 .../tests/integration/containers/ZKContainer.java  |   6 +
 tests/pulsar-client-admin-shade-test/pom.xml       |   1 -
 tests/pulsar-client-all-shade-test/pom.xml         |   1 -
 tests/pulsar-client-shade-test/pom.xml             |   1 -
 28 files changed, 616 insertions(+), 34 deletions(-)

diff --git a/.github/workflows/pulsar-ci-flaky.yaml 
b/.github/workflows/pulsar-ci-flaky.yaml
index 0924ecd02cc..e002d007b9c 100644
--- a/.github/workflows/pulsar-ci-flaky.yaml
+++ b/.github/workflows/pulsar-ci-flaky.yaml
@@ -25,9 +25,9 @@ on:
       - branch-*
       - pulsar-*
   schedule:
-    # scheduled job with JDK 17
-    - cron: '0 12 * * *'
     # scheduled job with JDK 21
+    - cron: '0 12 * * *'
+    # scheduled job with JDK 17
     # if cron expression is changed, make sure to update the expression in 
jdk_major_version step in preconditions job
     - cron: '0 6 * * *'
   workflow_dispatch:
@@ -61,6 +61,15 @@ on:
         required: true
         type: number
         default: 10000
+      netty_leak_detection:
+        description: 'Controls Netty leak detection. When set to "report", 
Netty leak detection is enabled. When set to "fail_on_leak", Netty leak 
detection is enabled and a build job will fail if leaks are detected. When set 
to "off", Netty leak detection is disabled.'
+        required: true
+        type: choice
+        options:
+          - 'report'
+          - 'fail_on_leak'
+          - 'off'
+        default: 'report'
 
 concurrency:
   group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}${{ 
github.event_name == 'workflow_dispatch' && 
github.event.inputs.jdk_major_version || '' }}
@@ -84,7 +93,8 @@ jobs:
       need_owasp: ${{ steps.changes.outputs.need_owasp }}
       collect_coverage: ${{ steps.check_coverage.outputs.collect_coverage }}
       jdk_major_version: ${{ steps.jdk_major_version.outputs.jdk_major_version 
}}
-
+      java_non_tests: ${{ steps.changes.outputs.java_non_tests }}
+      netty_leak_detection: ${{ 
steps.netty_leak_detection.outputs.netty_leak_detection }}
     steps:
       - name: Cancel scheduled jobs in forks by default
         if: ${{ github.repository != 'apache/pulsar' && github.event_name == 
'schedule' }}
@@ -136,6 +146,13 @@ jobs:
           || (github.event_name == 'workflow_dispatch' && 
github.event.inputs.collect_coverage == 'true')
           }}"  >> $GITHUB_OUTPUT
 
+      - name: Set Netty leak detection mode
+        id: netty_leak_detection
+        run: |
+          echo "netty_leak_detection=${{ 
+          github.event_name == 'workflow_dispatch' && 
github.event.inputs.netty_leak_detection || 'report'
+          }}" >> $GITHUB_OUTPUT
+
       - name: Check if the PR has been approved for testing
         if: ${{ steps.check_changes.outputs.docs_only != 'true' && 
github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
         env:
@@ -156,6 +173,8 @@ jobs:
       TRACE_TEST_RESOURCE_CLEANUP_DIR: ${{ github.workspace 
}}/target/trace-test-resource-cleanup
       THREAD_LEAK_DETECTOR_WAIT_MILLIS: ${{ github.event_name == 
'workflow_dispatch' && github.event.inputs.thread_leak_detector_wait_millis || 
10000 }}
       THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace 
}}/target/thread-leak-dumps
+      NETTY_LEAK_DETECTION: "${{ 
needs.preconditions.outputs.netty_leak_detection }}"
+      NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
     runs-on: ubuntu-22.04
     timeout-minutes: 100
     if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
@@ -226,6 +245,10 @@ jobs:
             cat threadleak*.txt | awk '/^Summary:/ {print "::warning::" $0 
"\n"; next} {print}'  
           fi
 
+      - name: Report detected Netty leaks
+        if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+        run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
       - name: Create Jacoco reports
         if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }}
         continue-on-error: true
@@ -268,6 +291,7 @@ jobs:
             /tmp/*.hprof
             **/hs_err_*.log
             **/core.*
+            ${{ env.NETTY_LEAK_DUMP_DIR }}/*
             ${{ env.TRACE_TEST_RESOURCE_CLEANUP_DIR }}/*
             ${{ env.THREAD_LEAK_DETECTOR_DIR }}/*
           retention-days: 7
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 9720c72057e..49c31cb56ae 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -61,6 +61,15 @@ on:
         required: true
         type: number
         default: 10000
+      netty_leak_detection:
+        description: 'Controls Netty leak detection. When set to "report", 
Netty leak detection is enabled. When set to "fail_on_leak", Netty leak 
detection is enabled and a build job will fail if leaks are detected. When set 
to "off", Netty leak detection is disabled.'
+        required: true
+        type: choice
+        options:
+          - 'report'
+          - 'fail_on_leak'
+          - 'off'
+        default: 'report'
 
 concurrency:
   group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}${{ 
github.event_name == 'workflow_dispatch' && 
github.event.inputs.jdk_major_version || '' }}
@@ -85,6 +94,7 @@ jobs:
       collect_coverage: ${{ steps.check_coverage.outputs.collect_coverage }}
       jdk_major_version: ${{ steps.jdk_major_version.outputs.jdk_major_version 
}}
       java_non_tests: ${{ steps.changes.outputs.java_non_tests }}
+      netty_leak_detection: ${{ 
steps.netty_leak_detection.outputs.netty_leak_detection }}
     steps:
       - name: Cancel scheduled jobs in forks by default
         if: ${{ github.repository != 'apache/pulsar' && github.event_name == 
'schedule' }}
@@ -136,6 +146,13 @@ jobs:
           || (github.event_name == 'workflow_dispatch' && 
github.event.inputs.collect_coverage == 'true')
           }}"  >> $GITHUB_OUTPUT
 
+      - name: Set Netty leak detection mode
+        id: netty_leak_detection
+        run: |
+          echo "netty_leak_detection=${{ 
+          github.event_name == 'workflow_dispatch' && 
github.event.inputs.netty_leak_detection || 'report'
+          }}" >> $GITHUB_OUTPUT
+
       - name: Check if the PR has been approved for testing
         if: ${{ steps.check_changes.outputs.docs_only != 'true' && 
github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
         env:
@@ -232,6 +249,8 @@ jobs:
       TRACE_TEST_RESOURCE_CLEANUP_DIR: ${{ github.workspace 
}}/target/trace-test-resource-cleanup
       THREAD_LEAK_DETECTOR_WAIT_MILLIS: ${{ github.event_name == 
'workflow_dispatch' && github.event.inputs.thread_leak_detector_wait_millis || 
10000 }}
       THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace 
}}/target/thread-leak-dumps
+      NETTY_LEAK_DETECTION: "${{ 
needs.preconditions.outputs.netty_leak_detection }}"
+      NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
     runs-on: ubuntu-22.04
     timeout-minutes: ${{ matrix.timeout || 60 }}
     needs: ['preconditions', 'build-and-license-check']
@@ -349,6 +368,10 @@ jobs:
             cat threadleak*.txt | awk '/^Summary:/ {print "::warning::" $0 
"\n"; next} {print}'
           fi
 
+      - name: Report detected Netty leaks
+        if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+        run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
       - name: Upload Surefire reports
         uses: actions/upload-artifact@v4
         if: ${{ !success() || env.TRACE_TEST_RESOURCE_CLEANUP != 'off' }}
@@ -366,6 +389,7 @@ jobs:
             /tmp/*.hprof
             **/hs_err_*.log
             **/core.*
+            ${{ env.NETTY_LEAK_DUMP_DIR }}/*
             ${{ env.TRACE_TEST_RESOURCE_CLEANUP_DIR }}/*
             ${{ env.THREAD_LEAK_DETECTOR_DIR }}/*
           retention-days: 7
@@ -554,6 +578,8 @@ jobs:
       PULSAR_TEST_IMAGE_NAME: apachepulsar/java-test-image:latest
       DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
       CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version 
}}
+      NETTY_LEAK_DETECTION: "${{ 
needs.preconditions.outputs.netty_leak_detection }}"
+      NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
     strategy:
       fail-fast: false
       matrix:
@@ -702,6 +728,10 @@ jobs:
           report_paths: 'test-reports/TEST-*.xml'
           annotate_only: 'true'
 
+      - name: Report detected Netty leaks
+        if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+        run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
       - name: Upload Surefire reports
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
@@ -710,6 +740,19 @@ jobs:
           path: surefire-reports
           retention-days: 7
 
+      - name: Upload possible heap dump, core dump or crash files
+        uses: actions/upload-artifact@v4
+        if: ${{ always() }}
+        with:
+          name: Integration-${{ matrix.upload_name || matrix.group }}-dumps
+          path: |
+            /tmp/*.hprof
+            **/hs_err_*.log
+            **/core.*
+            ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+          retention-days: 7
+          if-no-files-found: ignore
+
       - name: Upload container logs
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
@@ -972,6 +1015,8 @@ jobs:
       PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
       DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
       CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version 
}}
+      NETTY_LEAK_DETECTION: "${{ 
needs.preconditions.outputs.netty_leak_detection }}"
+      NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
     strategy:
       fail-fast: false
       matrix:
@@ -1079,6 +1124,10 @@ jobs:
           report_paths: 'test-reports/TEST-*.xml'
           annotate_only: 'true'
 
+      - name: Report detected Netty leaks
+        if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+        run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
       - name: Upload container logs
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
@@ -1096,6 +1145,19 @@ jobs:
           path: surefire-reports
           retention-days: 7
 
+      - name: Upload possible heap dump, core dump or crash files
+        uses: actions/upload-artifact@v4
+        if: ${{ always() }}
+        with:
+          name: System-${{ matrix.group }}-dumps
+          path: |
+            /tmp/*.hprof
+            **/hs_err_*.log
+            **/core.*
+            ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+          retention-days: 7
+          if-no-files-found: ignore
+
       - name: Wait for ssh connection when build fails
         # ssh access is enabled for builds in own forks
         uses: ./.github/actions/ssh-access
@@ -1202,6 +1264,8 @@ jobs:
       PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest
       DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
       CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version 
}}
+      NETTY_LEAK_DETECTION: "${{ 
needs.preconditions.outputs.netty_leak_detection }}"
+      NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps
     strategy:
       fail-fast: false
       matrix:
@@ -1286,12 +1350,16 @@ jobs:
           report_paths: 'test-reports/TEST-*.xml'
           annotate_only: 'true'
 
+      - name: Report detected Netty leaks
+        if: ${{ always() && env.NETTY_LEAK_DETECTION != 'off' }}
+        run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh report_netty_leaks
+
       - name: Upload container logs
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
         continue-on-error: true
         with:
-          name: System-${{ matrix.group }}-container-logs
+          name: Flaky-System-${{ matrix.group }}-container-logs
           path: tests/integration/target/container-logs
           retention-days: 7
 
@@ -1299,10 +1367,23 @@ jobs:
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
         with:
-          name: System-${{ matrix.name }}-surefire-reports
+          name: Flaky-System-${{ matrix.name }}-surefire-reports
           path: surefire-reports
           retention-days: 7
 
+      - name: Upload possible heap dump, core dump or crash files
+        uses: actions/upload-artifact@v4
+        if: ${{ always() }}
+        with:
+          name: Flaky-System-${{ matrix.group }}-dumps
+          path: |
+            /tmp/*.hprof
+            **/hs_err_*.log
+            **/core.*
+            ${{ env.NETTY_LEAK_DUMP_DIR }}/*
+          retention-days: 7
+          if-no-files-found: ignore
+
       - name: Wait for ssh connection when build fails
         # ssh access is enabled for builds in own forks
         uses: ./.github/actions/ssh-access
diff --git a/build/pulsar_ci_tool.sh b/build/pulsar_ci_tool.sh
index 3d63f104cd5..ac2b9173af0 100755
--- a/build/pulsar_ci_tool.sh
+++ b/build/pulsar_ci_tool.sh
@@ -579,6 +579,68 @@ ci_create_inttest_coverage_report() {
   echo "::endgroup::"
 }
 
+ci_report_netty_leaks() {
+  if [ -z "$NETTY_LEAK_DUMP_DIR" ]; then
+    echo "NETTY_LEAK_DUMP_DIR isn't set"
+    return 0
+  fi
+  local temp_file=$(mktemp -t netty_leak.XXXX)
+
+  # concat all netty_leak_*.txt files in the dump directory to a temp file
+  if [ -d "$NETTY_LEAK_DUMP_DIR" ]; then
+    find "$NETTY_LEAK_DUMP_DIR" -maxdepth 1 -type f -name "netty_leak_*.txt" 
-exec cat {} \; >> $temp_file
+  fi
+
+  # check if there are any netty_leak_*.txt files in the container logs
+  local container_logs_dir="tests/integration/target/container-logs"
+  if [ -d "$container_logs_dir" ]; then
+    local container_netty_leak_dump_dir="$NETTY_LEAK_DUMP_DIR/container-logs"
+    mkdir -p "$container_netty_leak_dump_dir"
+    while read -r file; do
+      # example file name 
"tests/integration/target/container-logs/ltnizrzm-standalone/var-log-pulsar.tar.gz"
+      # take ltnizrzm-standalone part
+      container_name=$(basename "$(dirname "$file")")
+      target_dir="$container_netty_leak_dump_dir/$container_name"
+      mkdir -p "$target_dir"
+      tar -C "$target_dir" -zxf "$file" --strip-components=1 --wildcards 
--wildcards-match-slash '*/netty_leak_*.txt' >/dev/null 2>&1 || true
+    done < <(find "$container_logs_dir" -type f -name "*.tar.gz")
+    # remove all empty directories
+    find "$container_netty_leak_dump_dir" -type d -empty -delete
+    # print all netty_leak_*.txt files in the container logs dump directory to 
the temp file
+    if [ -d "$container_netty_leak_dump_dir" ]; then
+      find "$container_netty_leak_dump_dir" -type f -name "netty_leak_*.txt" 
-exec cat {} \; >> $temp_file
+    fi
+  fi
+
+  if [ -s $temp_file ]; then
+    local leak_found_log_message
+    if [[ "$NETTY_LEAK_DETECTION" == "fail_on_leak" ]]; then
+      leak_found_log_message="::error::Netty leaks found. Failing the build 
since Netty leak detection is set to 'fail_on_leak'."
+    else
+      leak_found_log_message="::warning::Netty leaks found."
+    fi
+    {
+      echo "${leak_found_log_message}"
+      local test_file_locations=$(grep -h -i test $temp_file | grep org.apache 
| sed 's/^[[:space:]]*//;s/[[:space:]]*$//;s/^Hint: //' | sort -u || true)
+      if [[ -n "$test_file_locations" ]]; then
+        echo "Test file locations in stack traces:"
+        echo
+        echo "$test_file_locations"
+      fi
+      echo "Details:"
+      cat $temp_file
+    } | tee $NETTY_LEAK_DUMP_DIR/leak_report.txt
+    touch target/netty_leaks_found
+    if [[ "$NETTY_LEAK_DETECTION" == "fail_on_leak" ]]; then
+      exit 1
+    fi
+  else
+    echo "No netty leaks found."
+    touch target/netty_leaks_not_found
+  fi
+  rm $temp_file
+}
+
 if [ -z "$1" ]; then
   echo "usage: $0 [ci_tool_function_name]"
   echo "Available ci tool functions:"
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index bb94e087eec..a2edce1394e 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -162,12 +162,17 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <!-- for testing FastThreadLocalStateCleaner -->
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
       <version>${netty.version}</version>
-      <scope>test</scope>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty.version}</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
diff --git 
a/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
 
b/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
new file mode 100644
index 00000000000..b270a1c8a2f
--- /dev/null
+++ 
b/buildtools/src/main/java/org/apache/pulsar/tests/ExtendedNettyLeakDetector.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import org.apache.logging.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A custom Netty leak detector used in Pulsar tests that dumps the detected 
leaks to a file in a directory that is
+ * configured with the NETTY_LEAK_DUMP_DIR environment variable. This 
directory defaults to java.io.tmpdir.
+ * The files will be named netty_leak_YYYYMMDD-HHMMSS.SSS.txt.
+ */
+public class ExtendedNettyLeakDetector<T> extends ResourceLeakDetector<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtendedNettyLeakDetector.class);
+    public static final String NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME 
= "io.netty.customResourceLeakDetector";
+
+    private static final File DUMP_DIR =
+            new File(System.getenv().getOrDefault("NETTY_LEAK_DUMP_DIR", 
System.getProperty("java.io.tmpdir")));
+    public static final String EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME =
+            ExtendedNettyLeakDetector.class.getName() + ".exitJvmOnLeak";
+    public static final String 
SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS_SYSTEM_PROPERTY_NAME =
+            ExtendedNettyLeakDetector.class.getName() + 
".sleepAfterGCAndFinalizationMillis";
+    private static final long SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS = 
Long.parseLong(System.getProperty(
+            SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS_SYSTEM_PROPERTY_NAME, 
"10"));
+    private static boolean exitJvmOnLeak = Boolean.valueOf(
+            System.getProperty(EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME, 
"false"));
+    private static final boolean DEFAULT_EXIT_JVM_ON_LEAK = exitJvmOnLeak;
+    public static final String EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME =
+            ExtendedNettyLeakDetector.class.getName() + ".exitJvmDelayMillis";
+    private static final long EXIT_JVM_DELAY_MILLIS =
+            
Long.parseLong(System.getProperty(EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME, 
"1000"));
+    public static final String USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME =
+            ExtendedNettyLeakDetector.class.getName() + ".useShutdownHook";
+    private static boolean useShutdownHook = Boolean.valueOf(
+            System.getProperty(USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME, 
"false"));
+    static {
+        maybeRegisterShutdownHook();
+    }
+
+    private boolean exitThreadStarted;
+    private static volatile String initialHint;
+
+    /**
+     * Triggers Netty leak detection.
+     * Passing "-XX:+UnlockExperimentalVMOptions -XX:ReferencesPerThread=0" to 
the JVM will help to detect leaks
+     * with a shorter latency. When ReferencesPerThread is set to 0, the JVM 
will use maximum parallelism for processing
+     * reference objects. Netty's leak detection relies on WeakReferences and 
this setting will help to process them
+     * faster.
+     */
+    public static void triggerLeakDetection() {
+        if (!isEnabled()) {
+            return;
+        }
+        // run System.gc() to trigger finalization of objects and detection of 
possible Netty leaks
+        System.gc();
+        System.runFinalization();
+        try {
+            // wait for WeakReference collection to complete
+            Thread.sleep(SLEEP_AFTER_GC_AND_FINALIZATION_MILLIS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        triggerLeakReporting();
+    }
+
+    private static void triggerLeakReporting() {
+        // create a ByteBuf and release it to trigger leak detection
+        // this calls ResourceLeakDetector's reportLeak method when paranoid 
is enabled.
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
+        // this triggers leak detection which forces tracking even when 
paranoid isn't enabled
+        // check the source code of ResourceLeakDetector to see how it works 
or step through it in a debugger
+        // This will call leak detector's trackForcibly method which will then 
call ResourceLeakDetector's reportLeak
+        // trackForcibly gets called in 
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived
+        ByteBuf retainedSlice = buffer.retainedSlice();
+        retainedSlice.release();
+        buffer.release();
+    }
+
+    public ExtendedNettyLeakDetector(Class<?> resourceType, int 
samplingInterval, long maxActive) {
+        super(resourceType, samplingInterval, maxActive);
+    }
+
+    public ExtendedNettyLeakDetector(Class<?> resourceType, int 
samplingInterval) {
+        super(resourceType, samplingInterval);
+    }
+
+    public ExtendedNettyLeakDetector(String resourceType, int 
samplingInterval, long maxActive) {
+        super(resourceType, samplingInterval, maxActive);
+    }
+
+    /**
+     * Set the initial hint to be used when reporting leaks.
+     * This hint will be printed alongside the stack trace of the creation of 
the resource in the Netty leak report.
+     *
+     * @see ResourceLeakDetector#getInitialHint(String)
+     * @param initialHint the initial hint
+     */
+    public static void setInitialHint(String initialHint) {
+        ExtendedNettyLeakDetector.initialHint = initialHint;
+    }
+
+    @Override
+    protected boolean needReport() {
+        return true;
+    }
+
+    @Override
+    protected Object getInitialHint(String resourceType) {
+        String currentInitialHint = ExtendedNettyLeakDetector.initialHint;
+        if (currentInitialHint != null) {
+            return currentInitialHint;
+        } else {
+            return super.getInitialHint(resourceType);
+        }
+    }
+
+    @Override
+    protected void reportTracedLeak(String resourceType, String records) {
+        super.reportTracedLeak(resourceType, records);
+        dumpToFile(resourceType, records);
+        maybeExitJVM();
+    }
+
+    @Override
+    protected void reportUntracedLeak(String resourceType) {
+        super.reportUntracedLeak(resourceType);
+        dumpToFile(resourceType, null);
+        maybeExitJVM();
+    }
+
+    private synchronized void maybeExitJVM() {
+        if (exitThreadStarted) {
+            return;
+        }
+        if (exitJvmOnLeak) {
+            new Thread(() -> {
+                LOG.error("Exiting JVM due to Netty resource leak. Check logs 
for more details. Dumped to {}",
+                        DUMP_DIR.getAbsolutePath());
+                System.err.println("Exiting JVM due to Netty resource leak. 
Check logs for more details. Dumped to "
+                        + DUMP_DIR.getAbsolutePath());
+                triggerLeakDetectionBeforeJVMExit();
+                // shutdown log4j2 logging to prevent log truncation
+                LogManager.shutdown();
+                // flush log buffers
+                System.err.flush();
+                System.out.flush();
+                // exit JVM immediately
+                Runtime.getRuntime().halt(1);
+            }).start();
+            exitThreadStarted = true;
+        }
+    }
+
+    private void dumpToFile(String resourceType, String records) {
+        try {
+            if (!DUMP_DIR.exists()) {
+                DUMP_DIR.mkdirs();
+            }
+            String datetimePart =
+                    
DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss.SSS").format(ZonedDateTime.now());
+            File nettyLeakDumpFile =
+                    new File(DUMP_DIR, "netty_leak_" + datetimePart + ".txt");
+            // Prefix the line to make it easier to show the errors in GitHub 
Actions as annotations
+            String linePrefix = exitJvmOnLeak ? "::error::" : "::warning::";
+            try (PrintWriter out = new PrintWriter(new 
FileWriter(nettyLeakDumpFile, true))) {
+                out.print(linePrefix);
+                if (records != null) {
+                    out.println("Traced leak detected " + resourceType);
+                    out.println(records);
+                } else {
+                    out.println("Untraced leak detected " + resourceType);
+                }
+                out.println();
+                out.flush();
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot write thread leak dump", e);
+        }
+    }
+
+    public static boolean isExtendedNettyLeakDetectorEnabled() {
+        return ExtendedNettyLeakDetector.class.getName()
+                
.equals(System.getProperty(NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME));
+    }
+
+    /**
+     * Disable exit on leak. This is useful when exitJvmOnLeak is enabled and 
there's a test that is expected
+     * to leak resources. This method can be called before the test execution 
begins.
+     * This will not disable the leak detection itself, only the exit on leak 
behavior.
+     */
+    public static void disableExitJVMOnLeak() {
+        exitJvmOnLeak = false;
+    }
+
+    /**
+     * Restore exit on leak to original value. This is used to re-enable 
exitJvmOnLeak feature after it was
+     * disabled for the duration of a specific test using disableExitJVMOnLeak.
+     */
+    public static void restoreExitJVMOnLeak() {
+        triggerLeakDetection();
+        exitJvmOnLeak = DEFAULT_EXIT_JVM_ON_LEAK;
+    }
+
+    /**
+     * Shutdown hook to trigger leak detection on JVM shutdown.
+     * This is useful when using the leak detector in actual production code 
or in system tests which
+     * don't use don't have a test listener that would be calling 
triggerLeakDetection before the JVM exits.
+     */
+    private static void maybeRegisterShutdownHook() {
+        if (!exitJvmOnLeak && useShutdownHook) {
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                if (!isEnabled()) {
+                    return;
+                }
+                triggerLeakDetectionBeforeJVMExit();
+            }, ExtendedNettyLeakDetector.class.getSimpleName() + 
"ShutdownHook"));
+        }
+    }
+
+    private static void triggerLeakDetectionBeforeJVMExit() {
+        triggerLeakDetection();
+        // wait for a while
+        try {
+            Thread.sleep(EXIT_JVM_DELAY_MILLIS);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // trigger leak detection again to increase the chances of detecting 
leaks
+        // this could be helpful if more objects were finalized asynchronously 
during the delay
+        triggerLeakDetection();
+    }
+}
diff --git 
a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java 
b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
index 2d1f1273272..cabfdf30069 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java
@@ -19,16 +19,21 @@
 package org.apache.pulsar.tests;
 
 import java.util.Arrays;
+import org.testng.IExecutionListener;
+import org.testng.ISuite;
+import org.testng.ISuiteListener;
 import org.testng.ITestContext;
 import org.testng.ITestListener;
 import org.testng.ITestResult;
 import org.testng.SkipException;
 import org.testng.internal.thread.ThreadTimeoutException;
 
-public class PulsarTestListener implements ITestListener {
+public class PulsarTestListener implements ITestListener, IExecutionListener, 
ISuiteListener {
 
     @Override
     public void onTestStart(ITestResult result) {
+        ExtendedNettyLeakDetector.setInitialHint(String.format("Test: %s.%s", 
result.getTestClass().getName(),
+                result.getMethod().getMethodName()));
         System.out.format("------- Starting test %s.%s(%s)-------\n", 
result.getTestClass(),
                 result.getMethod().getMethodName(), 
Arrays.toString(result.getParameters()));
     }
@@ -37,6 +42,7 @@ public class PulsarTestListener implements ITestListener {
     public void onTestSuccess(ITestResult result) {
         System.out.format("------- SUCCESS -- %s.%s(%s)-------\n", 
result.getTestClass(),
                 result.getMethod().getMethodName(), 
Arrays.toString(result.getParameters()));
+        ExtendedNettyLeakDetector.triggerLeakDetection();
     }
 
     @Override
@@ -52,6 +58,7 @@ public class PulsarTestListener implements ITestListener {
                 }
             }
         }
+        ExtendedNettyLeakDetector.triggerLeakDetection();
     }
 
     @Override
@@ -71,15 +78,36 @@ public class PulsarTestListener implements ITestListener {
 
     @Override
     public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
-
+        ExtendedNettyLeakDetector.triggerLeakDetection();
     }
 
     @Override
     public void onStart(ITestContext context) {
-
+        ExtendedNettyLeakDetector.setInitialHint("Starting test: " + 
context.getName());
     }
 
     @Override
     public void onFinish(ITestContext context) {
+        ExtendedNettyLeakDetector.triggerLeakDetection();
+        ExtendedNettyLeakDetector.setInitialHint("Finished test: " + 
context.getName());
+    }
+
+    @Override
+    public void onFinish(ISuite suite) {
+        ExtendedNettyLeakDetector.setInitialHint("Finished suite: " + 
suite.getName());
+    }
+
+    @Override
+    public void onExecutionFinish() {
+        if (!ExtendedNettyLeakDetector.isEnabled()) {
+            return;
+        }
+        ExtendedNettyLeakDetector.triggerLeakDetection();
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ExtendedNettyLeakDetector.triggerLeakDetection();
     }
 }
diff --git a/conf/bkenv.sh b/conf/bkenv.sh
index 6b7cfe6d2cb..5e8c2572a58 100644
--- a/conf/bkenv.sh
+++ b/conf/bkenv.sh
@@ -89,7 +89,7 @@ if [[ -z "$BOOKIE_GC_LOG" ]]; then
 fi
 
 # Extra options to be passed to the jvm
-BOOKIE_EXTRA_OPTS="${BOOKIE_EXTRA_OPTS:-"-Dio.netty.leakDetectionLevel=disabled
 ${PULSAR_EXTRA_OPTS:-"-Dio.netty.recycler.maxCapacityPerThread=4096"}"}"
+BOOKIE_EXTRA_OPTS="${BOOKIE_EXTRA_OPTS:-"-Dio.netty.leakDetection.level=disabled
 ${PULSAR_EXTRA_OPTS:-"-Dio.netty.recycler.maxCapacityPerThread=4096"}"}"
 
 # Add extra paths to the bookkeeper classpath
 # BOOKIE_EXTRA_CLASSPATH=
diff --git a/conf/pulsar_tools_env.sh b/conf/pulsar_tools_env.sh
index 96ee304bf0b..6020e0a863a 100755
--- a/conf/pulsar_tools_env.sh
+++ b/conf/pulsar_tools_env.sh
@@ -58,7 +58,7 @@ fi
 PULSAR_MEM=${PULSAR_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}
 
 # Extra options to be passed to the jvm
-PULSAR_EXTRA_OPTS="${PULSAR_MEM} ${PULSAR_GC} ${PULSAR_GC_LOG} 
-Dio.netty.leakDetectionLevel=disabled ${PULSAR_EXTRA_OPTS}"
+PULSAR_EXTRA_OPTS="${PULSAR_MEM} ${PULSAR_GC} ${PULSAR_GC_LOG} 
-Dio.netty.leakDetection.level=disabled ${PULSAR_EXTRA_OPTS}"
 
 # Add extra paths to the bookkeeper classpath
 # PULSAR_EXTRA_CLASSPATH=
diff --git a/deployment/terraform-ansible/templates/pulsar_env.sh 
b/deployment/terraform-ansible/templates/pulsar_env.sh
index 020b5d5ac3a..2638718ee55 100644
--- a/deployment/terraform-ansible/templates/pulsar_env.sh
+++ b/deployment/terraform-ansible/templates/pulsar_env.sh
@@ -48,7 +48,7 @@ PULSAR_MEM=" -Xms{{ max_heap_memory }} -Xmx{{ max_heap_memory 
}} -XX:MaxDirectMe
 PULSAR_GC=" -XX:+UseZGC -XX:+PerfDisableSharedMem -XX:+AlwaysPreTouch"
 
 # Extra options to be passed to the jvm
-PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} -Dio.netty.leakDetectionLevel=disabled 
-Dio.netty.recycler.maxCapacityPerThread=4096"
+PULSAR_EXTRA_OPTS="-Dio.netty.leakDetection.level=disabled 
-Dio.netty.recycler.maxCapacityPerThread=4096 ${PULSAR_EXTRA_OPTS}"
 
 # Add extra paths to the bookkeeper classpath
 # PULSAR_EXTRA_CLASSPATH=
diff --git a/pom.xml b/pom.xml
index f04c89e596c..d6b6fbc23c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,8 +117,40 @@ flexible messaging model and an intuitive client 
API.</description>
       -XX:+EnableDynamicAgentLoading <!-- byte-buddy-agent and mockito-core 
agent dynamic loading -->
       -Xshare:off <!-- suppress sharing warning -->
       ${test.additional.args.jdk24}
+      ${test.netty.args}
     </test.additional.args>
     <test.additional.args.jdk24></test.additional.args.jdk24>
+    <!-- Netty related JVM args, including Netty leak detection enabled in 
tests -->
+    <test.netty.args>
+      <!-- required for triggering GC to trigger leak detection synchronously 
in tests -->
+      -XX:+UnlockExperimentalVMOptions -XX:ReferencesPerThread=0 
-XX:+ParallelRefProcEnabled
+      <!--
+      enable optimized Netty direct memory buffer access
+      
https://pulsar.apache.org/docs/4.0.x/client-libraries-java-setup/#enabling-optimized-netty-direct-memory-buffer-access
+      -->
+      --add-opens java.base/java.nio=ALL-UNNAMED
+      --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
+      -Dio.netty.tryReflectionSetAccessible=true
+      -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true
+      -Dpulsar.allocator.pooled=true
+      -Dpulsar.allocator.exit_on_oom=false
+      -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
+      ${test.netty.leakdetection.args}
+    </test.netty.args>
+    <test.netty.leakdetection.args>
+      <!-- use the custom resource leak detector in buildtools module -->
+      
-Dio.netty.customResourceLeakDetector=org.apache.pulsar.tests.ExtendedNettyLeakDetector
+      
-Dorg.apache.pulsar.tests.ExtendedNettyLeakDetector.exitJvmOnLeak=${testExitJvmOnLeak}
+      
-Dorg.apache.pulsar.tests.ExtendedNettyLeakDetector.exitJvmDelayMillis=${testExitJvmOnLeakDelayMillis}
+      -Dio.netty.leakDetection.level=${testLeakDetectionLevel}
+      <!-- allows using paranoid leak detection with relatively low overhead 
-->
+      -Dio.netty.leakDetection.targetRecords=16
+      -Dio.netty.leakDetection.acquireAndReleaseOnly=true
+      -Dio.netty.leakDetection.samplingInterval=32
+    </test.netty.leakdetection.args>
+    <testExitJvmOnLeak>false</testExitJvmOnLeak>
+    <testExitJvmOnLeakDelayMillis>1000</testExitJvmOnLeakDelayMillis>
+    <testLeakDetectionLevel>paranoid</testLeakDetectionLevel>
     <testMaxHeapSize>1300M</testMaxHeapSize>
     <testReuseFork>true</testReuseFork>
     <testForkCount>4</testForkCount>
@@ -1833,12 +1865,8 @@ flexible messaging model and an intuitive client 
API.</description>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
-          <argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError 
-Xmx${testMaxHeapSize} -XX:+UseZGC
-            -Dpulsar.allocator.pooled=true
-            -Dpulsar.allocator.leak_detection=Advanced
-            -Dpulsar.allocator.exit_on_oom=false
-            -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
-            -Dio.netty.tryReflectionSetAccessible=true
+          <argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${testHeapDumpPath}
+            -XX:+ExitOnOutOfMemoryError -Xmx${testMaxHeapSize} -XX:+UseZGC
             ${test.additional.args}
           </argLine>
           <reuseForks>${testReuseFork}</reuseForks>
@@ -2943,6 +2971,20 @@ flexible messaging model and an intuitive client 
API.</description>
       </properties>
     </profile>
 
+    <!-- This profile is used to disable Netty leak detection in tests. It is 
activated by setting the
+         environment variable NETTY_LEAK_DETECTION to off. -->
+    <profile>
+      <id>disableNettyLeakDetection</id>
+      <activation>
+        <property>
+          <name>env.NETTY_LEAK_DETECTION</name>
+          <value>off</value>
+        </property>
+      </activation>
+      <properties>
+       <test.netty.leakdetection.args></test.netty.leakdetection.args>
+      </properties>
+    </profile>
   </profiles>
 
   <repositories>
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
index eb31d835ec1..961c6f0176d 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
@@ -25,7 +25,6 @@ import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBufAllocator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
@@ -48,7 +47,6 @@ public class PulsarByteBufAllocatorDefaultTest {
             assertTrue(arguments.get(0) instanceof ByteBufAllocator);
             assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
             assertEquals(arguments.get(4), OutOfMemoryPolicy.FallbackToHeap);
-            assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
         })) {
             assertFalse(called.get());
             PulsarByteBufAllocator.createByteBufAllocator();
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
index 3f1dbc29bd9..9bb9ea1eafe 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
@@ -25,7 +25,6 @@ import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBufAllocator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
@@ -49,7 +48,6 @@ public class PulsarByteBufAllocatorOomThrowExceptionTest {
                     assertTrue(arguments.get(0) instanceof ByteBufAllocator);
                     assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
                     assertEquals(arguments.get(4), 
OutOfMemoryPolicy.ThrowException);
-                    assertEquals(arguments.get(6), 
LeakDetectionPolicy.Advanced);
                 })) {
             assertFalse(called.get());
             PulsarByteBufAllocator.createByteBufAllocator();
diff --git a/tests/bc_2_0_0/pom.xml b/tests/bc_2_0_0/pom.xml
index bf7eced9373..89930ec50c0 100644
--- a/tests/bc_2_0_0/pom.xml
+++ b/tests/bc_2_0_0/pom.xml
@@ -92,7 +92,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>
diff --git a/tests/bc_2_0_1/pom.xml b/tests/bc_2_0_1/pom.xml
index c8951b30dc2..6e4cd8c4c0b 100644
--- a/tests/bc_2_0_1/pom.xml
+++ b/tests/bc_2_0_1/pom.xml
@@ -92,7 +92,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>
diff --git a/tests/bc_2_6_0/pom.xml b/tests/bc_2_6_0/pom.xml
index a563c913e92..44b658f2549 100644
--- a/tests/bc_2_6_0/pom.xml
+++ b/tests/bc_2_6_0/pom.xml
@@ -99,7 +99,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>
diff --git a/tests/docker-images/java-test-image/Dockerfile 
b/tests/docker-images/java-test-image/Dockerfile
index 805f20a0570..9cf45130d02 100644
--- a/tests/docker-images/java-test-image/Dockerfile
+++ b/tests/docker-images/java-test-image/Dockerfile
@@ -39,3 +39,6 @@ RUN mv /etc/supervisord/conf.d/supervisord.conf 
/etc/supervisord.conf
 COPY target/certificate-authority /pulsar/certificate-authority/
 
 COPY target/java-test-functions.jar /pulsar/examples/
+
+# copy buildtools.jar to /pulsar/lib so that 
org.apache.pulsar.tests.ExtendedNettyLeakDetector can be used
+COPY target/buildtools.jar /pulsar/lib/
\ No newline at end of file
diff --git a/tests/docker-images/java-test-image/pom.xml 
b/tests/docker-images/java-test-image/pom.xml
index cec56c2f237..3d265d37dba 100644
--- a/tests/docker-images/java-test-image/pom.xml
+++ b/tests/docker-images/java-test-image/pom.xml
@@ -91,6 +91,15 @@
                       
<outputDirectory>${project.build.directory}</outputDirectory>
                       
<destFileName>pulsar-server-distribution-bin.tar.gz</destFileName>
                     </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.pulsar</groupId>
+                      <artifactId>buildtools</artifactId>
+                      <version>${project.parent.version}</version>
+                      <type>jar</type>
+                      <overWrite>true</overWrite>
+                      
<outputDirectory>${project.build.directory}</outputDirectory>
+                      <destFileName>buildtools.jar</destFileName>
+                    </artifactItem>
                   </artifactItems>
                 </configuration>
               </execution>
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index 46271e4cb54..074f411b952 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -64,6 +64,9 @@ RUN chmod g+rx /pulsar/examples/python-examples/
 # copy java test examples
 COPY target/java-test-functions.jar /pulsar/examples/
 
+# copy buildtools.jar to /pulsar/lib so that 
org.apache.pulsar.tests.ExtendedNettyLeakDetector can be used
+COPY target/buildtools.jar /pulsar/lib/
+
 # copy go test examples
 COPY --from=pulsar-function-go /go/bin /pulsar/examples/go-examples
 
diff --git a/tests/docker-images/latest-version-image/pom.xml 
b/tests/docker-images/latest-version-image/pom.xml
index 6c538f17b7f..29108e529b7 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -94,6 +94,15 @@
                       
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
                       <destFileName>java-test-plugins.nar</destFileName>
                     </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.pulsar</groupId>
+                      <artifactId>buildtools</artifactId>
+                      <version>${project.parent.version}</version>
+                      <type>jar</type>
+                      <overWrite>true</overWrite>
+                      
<outputDirectory>${project.build.directory}</outputDirectory>
+                      <destFileName>buildtools.jar</destFileName>
+                    </artifactItem>
                   </artifactItems>
                 </configuration>
               </execution>
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 8bb2ce7c7f9..9cab897cdb3 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -302,7 +302,7 @@
             <artifactId>maven-surefire-plugin</artifactId>
             <configuration>
               <argLine>${testJacocoAgentArgument} -XX:+ExitOnOutOfMemoryError 
-Xmx1G -XX:MaxDirectMemorySize=1G
-              -Dio.netty.leakDetectionLevel=advanced 
-Dconfluent.version=${confluent.version} 
-Djacoco.version=${jacoco-maven-plugin.version}
+              -Dconfluent.version=${confluent.version} 
-Djacoco.version=${jacoco-maven-plugin.version}
               
-Dintegrationtest.coverage.enabled=${integrationtest.coverage.enabled} 
-Dintegrationtest.coverage.dir=${integrationtest.coverage.dir}
               ${test.additional.args}
               </argLine>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
index 9ff48b2e6a1..64806a1f9ab 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
@@ -30,4 +30,9 @@ public class BKContainer extends PulsarContainer<BKContainer> 
{
             clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, 
INVALID_PORT);
         tailContainerLog();
     }
+
+    @Override
+    protected boolean isPassNettyLeakDetectionSystemProperties() {
+        return false;
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
index de67d567233..20c7a79dc44 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
@@ -39,4 +39,9 @@ public class CSContainer extends PulsarContainer<CSContainer> 
{
     protected boolean isCodeCoverageEnabled() {
         return false;
     }
+
+    @Override
+    protected boolean isPassNettyLeakDetectionSystemProperties() {
+        return false;
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index eb0acf33a89..59af8b54b9a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -19,12 +19,11 @@
 package org.apache.pulsar.tests.integration.containers;
 
 import com.github.dockerjava.api.DockerClient;
-
 import java.util.Base64;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.GenericContainer;
@@ -48,6 +47,29 @@ public class ChaosContainer<SelfT extends 
ChaosContainer<SelfT>> extends Generic
         addEnv("MALLOC_ARENA_MAX", "1");
     }
 
+    protected void appendToEnv(String key, String value) {
+        String existingValue = getEnvMap().get(key);
+        if (existingValue == null) {
+            addEnv(key, value);
+        } else {
+            addEnv(key, existingValue + " " + value);
+        }
+    }
+
+    protected void passSystemPropertyInEnv(String envKey, String 
systemPropertyName) {
+        passSystemPropertyInEnv(envKey, systemPropertyName, 
System.getProperty(systemPropertyName));
+    }
+
+    protected void passSystemPropertyInEnv(String envKey, String 
systemPropertyName, String systemPropertyValue) {
+        if (systemPropertyValue != null) {
+            String entryValue = "-D" + systemPropertyName + "=" + 
systemPropertyValue;
+            if (StringUtils.containsWhitespace(systemPropertyValue)) {
+                entryValue = "\"" + entryValue + "\"";
+            }
+            appendToEnv(envKey, entryValue);
+        }
+    }
+
     protected void beforeStop() {
         if (null == getContainerId()) {
             return;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 3cdb048aea5..104590045ce 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.tests.integration.containers;
 
 import static java.time.temporal.ChronoUnit.SECONDS;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -29,6 +28,7 @@ import java.util.UUID;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.tests.ExtendedNettyLeakDetector;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.BindMode;
@@ -252,12 +252,42 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
             configureCodeCoverage();
         }
 
+        if (isPassNettyLeakDetectionSystemProperties()) {
+            passNettyLeakDetectionSystemProperties();
+        }
+
         beforeStart();
         super.start();
         afterStart();
         log.info("[{}] Start pulsar service {} at container {}", 
getContainerName(), serviceName, getContainerId());
     }
 
+    protected boolean isPassNettyLeakDetectionSystemProperties() {
+        return true;
+    }
+
+    protected void passNettyLeakDetectionSystemProperties() {
+        if (isPassNettyLeakDetectionSystemProperties()) {
+            String envKey = "PULSAR_EXTRA_OPTS";
+            // pass similar defaults as there is in conf/pulsar_env.sh
+            appendToEnv("PULSAR_EXTRA_OPTS",
+                    "-Dpulsar.allocator.exit_on_oom=true 
-Dio.netty.recycler.maxCapacityPerThread=4096");
+            passSystemPropertyInEnv(envKey, 
ExtendedNettyLeakDetector.NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME);
+            if 
(ExtendedNettyLeakDetector.isExtendedNettyLeakDetectorEnabled()) {
+                // enable shutdown hook for extended leak detector in 
containers
+                passSystemPropertyInEnv(envKey, 
ExtendedNettyLeakDetector.USE_SHUTDOWN_HOOK_SYSTEM_PROPERTY_NAME,
+                        "true");
+            }
+            passSystemPropertyInEnv(envKey, 
ExtendedNettyLeakDetector.EXIT_JVM_ON_LEAK_SYSTEM_PROPERTY_NAME);
+            passSystemPropertyInEnv(envKey, 
ExtendedNettyLeakDetector.EXIT_JVM_DELAY_MILLIS_SYSTEM_PROPERTY_NAME);
+            passSystemPropertyInEnv(envKey, "io.netty.leakDetection.level");
+            passSystemPropertyInEnv(envKey, 
"io.netty.leakDetection.targetRecords");
+            passSystemPropertyInEnv(envKey, 
"io.netty.leakDetection.samplingInterval");
+            passSystemPropertyInEnv(envKey, 
"io.netty.leakDetection.acquireAndReleaseOnly");
+            addEnv("NETTY_LEAK_DUMP_DIR", "/var/log/pulsar");
+        }
+    }
+
     protected boolean isCodeCoverageEnabled() {
         return Boolean.getBoolean("integrationtest.coverage.enabled");
     }
@@ -286,7 +316,7 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
-            withEnv("OPTS", "-javaagent:/jacocoDir/" + jacocoAgentJar.getName()
+            appendToEnv("OPTS", "-javaagent:/jacocoDir/" + 
jacocoAgentJar.getName()
                     + "=destfile=/jacocoDir/jacoco_" + getContainerName() + 
"_" + System.currentTimeMillis() + ".exec"
                     + 
",includes=org.apache.pulsar.*:org.apache.bookkeeper.mledger.*"
                     + ",excludes=*.proto.*:*.shade.*:*.shaded.*");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
index c55eb3242b4..2a372475120 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
@@ -39,4 +39,10 @@ public class ZKContainer extends 
PulsarContainer<ZKContainer> {
     protected boolean isCodeCoverageEnabled() {
         return false;
     }
+
+
+    @Override
+    protected boolean isPassNettyLeakDetectionSystemProperties() {
+        return false;
+    }
 }
diff --git a/tests/pulsar-client-admin-shade-test/pom.xml 
b/tests/pulsar-client-admin-shade-test/pom.xml
index 1ce482ad232..f48b840f1ca 100644
--- a/tests/pulsar-client-admin-shade-test/pom.xml
+++ b/tests/pulsar-client-admin-shade-test/pom.xml
@@ -107,7 +107,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>
diff --git a/tests/pulsar-client-all-shade-test/pom.xml 
b/tests/pulsar-client-all-shade-test/pom.xml
index ce2705bb3c9..2e11e8fe53d 100644
--- a/tests/pulsar-client-all-shade-test/pom.xml
+++ b/tests/pulsar-client-all-shade-test/pom.xml
@@ -106,7 +106,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>
diff --git a/tests/pulsar-client-shade-test/pom.xml 
b/tests/pulsar-client-shade-test/pom.xml
index 7a5746c7861..49ba7c38494 100644
--- a/tests/pulsar-client-shade-test/pom.xml
+++ b/tests/pulsar-client-shade-test/pom.xml
@@ -101,7 +101,6 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <configuration>
                             <argLine>${testJacocoAgentArgument} 
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:MaxDirectMemorySize=8G
-                                -Dio.netty.leakDetectionLevel=advanced
                                 ${test.additional.args}
                             </argLine>
                             <skipTests>false</skipTests>

Reply via email to