This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c0fec288f [INLONG-12064][Audit] Static ScheduledExecutorService in
PulsarSink causes ClassLoader leaks and shared state corruption (#12085)
5c0fec288f is described below
commit 5c0fec288fe46155e98e29109f7c0d877217d030
Author: doleyzi <[email protected]>
AuthorDate: Thu Feb 26 17:08:16 2026 +0800
[INLONG-12064][Audit] Static ScheduledExecutorService in PulsarSink causes
ClassLoader leaks and shared state corruption (#12085)
Co-authored-by: doleyzi <[email protected]>
---
.../org/apache/inlong/audit/sink/PulsarSink.java | 68 +++++++++++++++++-----
1 file changed, 54 insertions(+), 14 deletions(-)
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index c99cb9ce16..c98fca78d1 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -47,6 +47,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -145,22 +147,16 @@ public class PulsarSink extends AbstractSink
private static final PulsarPerformanceTask pulsarPerformanceTask = new
PulsarPerformanceTask();
- private static ScheduledExecutorService scheduledExecutorService =
Executors
- .newScheduledThreadPool(1, new
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+ private static volatile ScheduledExecutorService scheduledExecutorService;
+
+ private static final AtomicBoolean schedulerStarted = new
AtomicBoolean(false);
+
+ private static final AtomicInteger activeInstances = new AtomicInteger(0);
private String topic;
private Context context;
- static {
- /*
- * stat pulsar performance
- */
- logger.info("init pulsarPerformanceTask");
- scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask,
0L,
- PRINT_INTERVAL, TimeUnit.SECONDS);
- }
-
public PulsarSink() {
super();
logger.debug("new instance of PulsarSink!");
@@ -199,6 +195,46 @@ public class PulsarSink extends AbstractSink
}
}
+ /**
+ * Start the performance scheduler if not already started.
+ * Uses compareAndSet to ensure the scheduler is only initialized once
across all instances.
+ */
+ private static synchronized void startPerformanceScheduler() {
+ if (!schedulerStarted.compareAndSet(false, true)) {
+ return;
+ }
+ scheduledExecutorService = Executors.newScheduledThreadPool(1,
+ new
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+ scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask,
PRINT_INTERVAL,
+ PRINT_INTERVAL, TimeUnit.SECONDS);
+ logger.info("PulsarPerformanceTask scheduler started");
+ }
+
+ /**
+ * Stop the performance scheduler when the last active instance is stopped.
+ * Uses reference counting to ensure the scheduler is only shut down
+ * when no more active instances remain.
+ */
+ private static synchronized void stopPerformanceScheduler() {
+ if (activeInstances.get() <= 0) {
+ logger.warn("No active instances to stop, skipping scheduler
shutdown");
+ return;
+ }
+
+ if (activeInstances.decrementAndGet() > 0) {
+ logger.info("Still have active instances, not shutting down
scheduler");
+ return;
+ }
+
+ if (scheduledExecutorService != null &&
!scheduledExecutorService.isShutdown()) {
+ logger.info("Shutting down pulsarPerformanceTask scheduler");
+ scheduledExecutorService.shutdownNow();
+ scheduledExecutorService = null;
+ }
+ schedulerStarted.set(false);
+ logger.info("PulsarPerformanceTask scheduler stopped");
+ }
+
private void initTopic() throws Exception {
long startTime = System.currentTimeMillis();
if (topic != null) {
@@ -230,6 +266,10 @@ public class PulsarSink extends AbstractSink
+ i);
sinkThreadPool[i].start();
}
+
+ activeInstances.incrementAndGet();
+ startPerformanceScheduler();
+
logger.debug("meta sink started");
}
@@ -258,9 +298,9 @@ public class PulsarSink extends AbstractSink
sinkThreadPool = null;
}
super.stop();
- if (!scheduledExecutorService.isShutdown()) {
- scheduledExecutorService.shutdown();
- }
+
+ stopPerformanceScheduler();
+
sinkCounter.stop();
logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
}