This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new adb656e298 [Hotfix][Zeta][Log] Fix zeta parallelStream log trace for 
job (#8456)
adb656e298 is described below

commit adb656e29815e36b8bbcc383e7b400bc38f050c9
Author: hailin0 <[email protected]>
AuthorDate: Tue Jan 7 10:26:44 2025 +0800

    [Hotfix][Zeta][Log] Fix zeta parallelStream log trace for job (#8456)
---
 .../apache/seatunnel/api/tracing/MDCCallable.java  |  15 +-
 .../seatunnel/api/tracing/MDCComparator.java       |  14 +-
 .../apache/seatunnel/api/tracing/MDCConsumer.java  |  14 +-
 .../apache/seatunnel/api/tracing/MDCContext.java   | 153 +++++++----
 .../apache/seatunnel/api/tracing/MDCExecutor.java  |   2 +-
 .../seatunnel/api/tracing/MDCExecutorService.java  |  14 +-
 .../apache/seatunnel/api/tracing/MDCFunction.java  |  14 +-
 .../apache/seatunnel/api/tracing/MDCPredicate.java |  14 +-
 .../apache/seatunnel/api/tracing/MDCRunnable.java  |  16 +-
 .../api/tracing/MDCScheduledExecutorService.java   |  10 +-
 .../apache/seatunnel/api/tracing/MDCStream.java    |  37 +--
 .../tracing/{MDCConsumer.java => MDCSupplier.java} |  19 +-
 .../apache/seatunnel/api/tracing/MDCTracer.java    |  32 +++
 .../seatunnel/api/tracing/MDCTracerTest.java       | 280 +++++++++++++++++----
 .../server/task/operation/TracingOperation.java    |   6 +-
 15 files changed, 466 insertions(+), 174 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
index f3cae160da..b56101e8c7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.tracing;
 
 import java.util.concurrent.Callable;
+import java.util.function.Supplier;
 
 /**
  * Callable that sets MDC context before calling the delegate and clears it 
afterwards.
@@ -25,7 +26,7 @@ import java.util.concurrent.Callable;
  * @param <V>
  */
 public class MDCCallable<V> implements Callable<V> {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     private final Callable<V> delegate;
 
     public MDCCallable(Callable<V> delegate) {
@@ -33,18 +34,18 @@ public class MDCCallable<V> implements Callable<V> {
     }
 
     public MDCCallable(MDCContext context, Callable<V> delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCCallable(Supplier<MDCContext> contextSupplier, Callable<V> 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public V call() throws Exception {
-        try {
-            context.put();
-
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             return delegate.call();
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
index 5672cf4392..8921b5356b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.api.tracing;
 
 import java.util.Comparator;
+import java.util.function.Supplier;
 
 public class MDCComparator<T> implements Comparator<T> {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     private final Comparator<T> delegate;
 
     public MDCComparator(Comparator<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCComparator<T> implements Comparator<T> {
     }
 
     public MDCComparator(MDCContext context, Comparator<T> delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCComparator(Supplier<MDCContext> contextSupplier, Comparator<T> 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public int compare(T o1, T o2) {
-        try {
-            context.put();
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             return delegate.compare(o1, o2);
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
index 6317f038a0..ecfb3b5a66 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.api.tracing;
 
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 public class MDCConsumer<T> implements Consumer<T> {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     private final Consumer<T> delegate;
 
     public MDCConsumer(Consumer<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCConsumer<T> implements Consumer<T> {
     }
 
     public MDCConsumer(MDCContext context, Consumer<T> delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCConsumer(Supplier<MDCContext> contextSupplier, Consumer<T> 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public void accept(T t) {
-        try {
-            context.put();
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             delegate.accept(t);
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
index 4a43937459..e343e5be1b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
@@ -19,22 +19,37 @@ package org.apache.seatunnel.api.tracing;
 
 import org.slf4j.MDC;
 
-import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.Closeable;
 import java.io.Serializable;
 
 /**
  * MDC context for tracing.
  *
  * <p>reference: https://www.slf4j.org/manual.html#mdc
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *     try (MDCContext ctx = MDCContext.of(jobId, pipelineId, 
taskId).activate()) {
+ *          // do something
+ *          new Thread(new MDCRunnable(MDCContext.current(), new Runnable() {
+ *             @Override
+ *             public void run() {
+ *                  // do something
+ *             }
+ *          }))
+ *          .start();
+ *     }
+ *     // MDC context will be restored after the try block
+ * </pre>
  */
 @Slf4j
-@Builder
 @EqualsAndHashCode
-public class MDCContext implements Serializable {
-    private static final MDCContext EMPTY = MDCContext.builder().build();
+public class MDCContext implements Serializable, Closeable {
+    private static final MDCContext EMPTY = new MDCContext(null, null, null);
     private static final String EMPTY_TO_STRING = "NA";
 
     public static final String JOB_ID = "ST-JID";
@@ -44,55 +59,24 @@ public class MDCContext implements Serializable {
     private final Long jobId;
     private final Long pipelineId;
     private final Long taskId;
+    private transient volatile MDCContext toRestore;
 
-    public static MDCContext of(long jobId) {
-        return MDCContext.builder().jobId(jobId).build();
-    }
-
-    public static MDCContext of(long jobId, long pipelineId) {
-        return 
MDCContext.builder().jobId(jobId).pipelineId(pipelineId).build();
-    }
-
-    public static MDCContext of(long jobId, long pipelineId, long taskId) {
-        return 
MDCContext.builder().jobId(jobId).pipelineId(pipelineId).taskId(taskId).build();
-    }
-
-    public static MDCContext current() {
-        return MDCContext.builder()
-                .jobId(MDC.get(JOB_ID) != null ? 
Long.parseLong(MDC.get(JOB_ID)) : null)
-                .pipelineId(
-                        MDC.get(PIPELINE_ID) != null ? 
Long.parseLong(MDC.get(PIPELINE_ID)) : null)
-                .taskId(MDC.get(TASK_ID) != null ? 
Long.parseLong(MDC.get(TASK_ID)) : null)
-                .build();
+    public MDCContext(Long jobId, Long pipelineId, Long taskId) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.taskId = taskId;
     }
 
-    public static MDCContext valueOf(String s) {
-        if (EMPTY_TO_STRING.equals(s)) {
-            return EMPTY;
+    public synchronized MDCContext activate() {
+        if (this == EMPTY) {
+            return this;
         }
 
-        String[] arr = s.split("/");
-        Long jobId = Long.parseLong(arr[0]);
-        Long pipelineId = Long.parseLong(arr[1]);
-        Long taskId = Long.parseLong(arr[2]);
-        if (pipelineId == 0 || taskId == 0) {
-            return MDCContext.of(jobId);
-        }
-        return MDCContext.of(jobId, pipelineId, taskId);
-    }
-
-    @Override
-    public String toString() {
-        if (jobId != null) {
-            return String.format(
-                    "%d/%d/%d",
-                    jobId, pipelineId == null ? 0 : pipelineId, taskId == null 
? 0 : taskId);
-        } else {
-            return EMPTY_TO_STRING;
+        if (this.toRestore != null) {
+            throw new IllegalStateException("MDCContext is already activated");
         }
-    }
+        this.toRestore = current();
 
-    public void put() {
         try {
             if (jobId != null) {
                 MDC.put(JOB_ID, String.valueOf(jobId));
@@ -107,9 +91,18 @@ public class MDCContext implements Serializable {
             log.error("Failed to put MDC context", e);
             throw e;
         }
+        return this;
     }
 
-    public void clear() {
+    public synchronized MDCContext deactivate() {
+        if (this == EMPTY) {
+            return this;
+        }
+
+        if (this.toRestore == null) {
+            throw new IllegalStateException("MDCContext is not activated");
+        }
+
         try {
             MDC.remove(JOB_ID);
             MDC.remove(PIPELINE_ID);
@@ -118,5 +111,71 @@ public class MDCContext implements Serializable {
             log.error("Failed to clear MDC context", e);
             throw e;
         }
+
+        if (this.toRestore != null) {
+            this.toRestore.activate();
+        }
+
+        return this;
+    }
+
+    @Override
+    public void close() {
+        deactivate();
+    }
+
+    @Override
+    public String toString() {
+        if (this == EMPTY) {
+            return EMPTY_TO_STRING;
+        }
+        return String.format(
+                "%d/%d/%d",
+                jobId, pipelineId == null ? 0 : pipelineId, taskId == null ? 0 
: taskId);
+    }
+
+    public static MDCContext of(long jobId) {
+        return new MDCContext(jobId, null, null);
+    }
+
+    public static MDCContext of(long jobId, long pipelineId) {
+        return new MDCContext(jobId, pipelineId, null);
+    }
+
+    public static MDCContext of(long jobId, long pipelineId, long taskId) {
+        return new MDCContext(jobId, pipelineId, taskId);
+    }
+
+    public static MDCContext of(MDCContext context) {
+        return new MDCContext(context.jobId, context.pipelineId, 
context.taskId);
+    }
+
+    public static MDCContext current() {
+        String jobId = MDC.get(JOB_ID);
+        if (jobId == null) {
+            return EMPTY;
+        }
+
+        String pipelineId = MDC.get(PIPELINE_ID);
+        String taskId = MDC.get(TASK_ID);
+        return new MDCContext(
+                Long.parseLong(jobId),
+                pipelineId != null ? Long.parseLong(pipelineId) : null,
+                taskId != null ? Long.parseLong(taskId) : null);
+    }
+
+    public static MDCContext valueOf(String s) {
+        if (EMPTY_TO_STRING.equals(s)) {
+            return EMPTY;
+        }
+
+        String[] arr = s.split("/");
+        Long jobId = Long.parseLong(arr[0]);
+        Long pipelineId = Long.parseLong(arr[1]);
+        Long taskId = Long.parseLong(arr[2]);
+        if (pipelineId == 0 || taskId == 0) {
+            return MDCContext.of(jobId);
+        }
+        return MDCContext.of(jobId, pipelineId, taskId);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
index 1651f7d6c4..e6d02df78e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
@@ -31,6 +31,6 @@ public class MDCExecutor implements Executor {
 
     @Override
     public void execute(Runnable command) {
-        delegate.execute(new MDCRunnable(context, command));
+        delegate.execute(new MDCRunnable(MDCContext.of(context), command));
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
index 7ef93f41df..f010dfe717 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
@@ -65,17 +65,17 @@ public class MDCExecutorService extends MDCExecutor 
implements ExecutorService {
 
     @Override
     public <T> Future<T> submit(Callable<T> task) {
-        return delegate.submit(new MDCCallable<>(context, task));
+        return delegate.submit(new MDCCallable<>(MDCContext.of(context), 
task));
     }
 
     @Override
     public <T> Future<T> submit(Runnable task, T result) {
-        return delegate.submit(new MDCRunnable(context, task), result);
+        return delegate.submit(new MDCRunnable(MDCContext.of(context), task), 
result);
     }
 
     @Override
     public Future<?> submit(Runnable task) {
-        return delegate.submit(new MDCRunnable(context, task));
+        return delegate.submit(new MDCRunnable(MDCContext.of(context), task));
     }
 
     @Override
@@ -83,7 +83,7 @@ public class MDCExecutorService extends MDCExecutor 
implements ExecutorService {
             throws InterruptedException {
         return delegate.invokeAll(
                 tasks.stream()
-                        .map(task -> new MDCCallable<>(context, task))
+                        .map(task -> new MDCCallable<>(MDCContext.of(context), 
task))
                         .collect(Collectors.toList()));
     }
 
@@ -93,7 +93,7 @@ public class MDCExecutorService extends MDCExecutor 
implements ExecutorService {
             throws InterruptedException {
         return delegate.invokeAll(
                 tasks.stream()
-                        .map(task -> new MDCCallable<>(context, task))
+                        .map(task -> new MDCCallable<>(MDCContext.of(context), 
task))
                         .collect(Collectors.toList()),
                 timeout,
                 unit);
@@ -104,7 +104,7 @@ public class MDCExecutorService extends MDCExecutor 
implements ExecutorService {
             throws InterruptedException, ExecutionException {
         return delegate.invokeAny(
                 tasks.stream()
-                        .map(task -> new MDCCallable<>(context, task))
+                        .map(task -> new MDCCallable<>(MDCContext.of(context), 
task))
                         .collect(Collectors.toList()));
     }
 
@@ -113,7 +113,7 @@ public class MDCExecutorService extends MDCExecutor 
implements ExecutorService {
             throws InterruptedException, ExecutionException, TimeoutException {
         return delegate.invokeAny(
                 tasks.stream()
-                        .map(task -> new MDCCallable<>(context, task))
+                        .map(task -> new MDCCallable<>(MDCContext.of(context), 
task))
                         .collect(Collectors.toList()),
                 timeout,
                 unit);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
index b8f0d22248..c4dc2463c4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.api.tracing;
 
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 public class MDCFunction<T, R> implements Function<T, R> {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     protected final Function<T, R> delegate;
 
     public MDCFunction(Function<T, R> delegate) {
@@ -28,17 +29,18 @@ public class MDCFunction<T, R> implements Function<T, R> {
     }
 
     public MDCFunction(MDCContext context, Function<T, R> delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCFunction(Supplier<MDCContext> contextSupplier, Function<T, R> 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public R apply(T t) {
-        try {
-            context.put();
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             return delegate.apply(t);
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
index d04edc064b..d01c290b57 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
@@ -18,9 +18,10 @@
 package org.apache.seatunnel.api.tracing;
 
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 public class MDCPredicate<T> implements Predicate<T> {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     private final Predicate<T> delegate;
 
     public MDCPredicate(Predicate<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCPredicate<T> implements Predicate<T> {
     }
 
     public MDCPredicate(MDCContext context, Predicate<T> delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCPredicate(Supplier<MDCContext> contextSupplier, Predicate<T> 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public boolean test(T t) {
-        try {
-            context.put();
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             return delegate.test(t);
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
index e6d310de10..02544db696 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
@@ -17,9 +17,11 @@
 
 package org.apache.seatunnel.api.tracing;
 
+import java.util.function.Supplier;
+
 /** Runnable that sets MDC context before calling the delegate and clears it 
afterwards. */
 public class MDCRunnable implements Runnable {
-    private final MDCContext context;
+    private final Supplier<MDCContext> contextSupplier;
     private final Runnable delegate;
 
     public MDCRunnable(Runnable delegate) {
@@ -27,18 +29,18 @@ public class MDCRunnable implements Runnable {
     }
 
     public MDCRunnable(MDCContext context, Runnable delegate) {
-        this.context = context;
+        this(() -> context, delegate);
+    }
+
+    public MDCRunnable(Supplier<MDCContext> contextSupplier, Runnable 
delegate) {
+        this.contextSupplier = contextSupplier;
         this.delegate = delegate;
     }
 
     @Override
     public void run() {
-        try {
-            context.put();
-
+        try (MDCContext ignored = contextSupplier.get().activate()) {
             delegate.run();
-        } finally {
-            context.clear();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
index 804e953ace..ed43a9fa9c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
@@ -39,25 +39,27 @@ public class MDCScheduledExecutorService extends 
MDCExecutorService
 
     @Override
     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
-        return delegate.schedule(new MDCRunnable(context, command), delay, 
unit);
+        return delegate.schedule(
+                new MDCRunnable(() -> MDCContext.of(context), command), delay, 
unit);
     }
 
     @Override
     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
-        return delegate.schedule(new MDCCallable<>(context, callable), delay, 
unit);
+        return delegate.schedule(
+                new MDCCallable<>(() -> MDCContext.of(context), callable), 
delay, unit);
     }
 
     @Override
     public ScheduledFuture<?> scheduleAtFixedRate(
             Runnable command, long initialDelay, long period, TimeUnit unit) {
         return delegate.scheduleAtFixedRate(
-                new MDCRunnable(context, command), initialDelay, period, unit);
+                new MDCRunnable(() -> MDCContext.of(context), command), 
initialDelay, period, unit);
     }
 
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(
             Runnable command, long initialDelay, long delay, TimeUnit unit) {
         return delegate.scheduleWithFixedDelay(
-                new MDCRunnable(context, command), initialDelay, delay, unit);
+                new MDCRunnable(() -> MDCContext.of(context), command), 
initialDelay, delay, unit);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
index 871af5cd60..3f85d55c79 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
@@ -53,62 +53,69 @@ public class MDCStream<T> implements Stream<T> {
 
     @Override
     public Stream<T> filter(Predicate<? super T> predicate) {
-        return new MDCStream<>(context, delegate.filter(new 
MDCPredicate<>(context, predicate)));
+        return new MDCStream<>(
+                context,
+                delegate.filter(new MDCPredicate<>(() -> 
MDCContext.of(context), predicate)));
     }
 
     @Override
     public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
-        return new MDCStream<>(context, delegate.map(new 
MDCFunction<>(context, mapper)));
+        return new MDCStream<>(
+                context, delegate.map(new MDCFunction<>(() -> 
MDCContext.of(context), mapper)));
     }
 
     @Override
     public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? 
extends R>> mapper) {
-        return new MDCStream<>(context, delegate.flatMap(new 
MDCFunction<>(context, mapper)));
+        return new MDCStream<>(
+                context, delegate.flatMap(new MDCFunction<>(() -> 
MDCContext.of(context), mapper)));
     }
 
     @Override
     public Stream<T> sorted(Comparator<? super T> comparator) {
-        return new MDCStream<>(context, delegate.sorted(new 
MDCComparator<>(context, comparator)));
+        return new MDCStream<>(
+                context,
+                delegate.sorted(new MDCComparator<>(() -> 
MDCContext.of(context), comparator)));
     }
 
     @Override
     public Stream<T> peek(Consumer<? super T> action) {
-        return new MDCStream<>(context, delegate.peek(new 
MDCConsumer<>(context, action)));
+        return new MDCStream<>(
+                context, delegate.peek(new MDCConsumer<>(() -> 
MDCContext.of(context), action)));
     }
 
     @Override
     public void forEach(Consumer<? super T> action) {
-        delegate.forEach(new MDCConsumer<>(context, action));
+        delegate.forEach(new MDCConsumer<>(() -> MDCContext.of(context), 
action));
     }
 
     @Override
     public void forEachOrdered(Consumer<? super T> action) {
-        delegate.forEachOrdered(new MDCConsumer<>(context, action));
+        delegate.forEachOrdered(new MDCConsumer<>(() -> 
MDCContext.of(context), action));
     }
 
     @Override
     public Optional<T> min(Comparator<? super T> comparator) {
-        return delegate.min(new MDCComparator<>(context, comparator));
+        return delegate.min(new MDCComparator<>(() -> MDCContext.of(context), 
comparator));
     }
 
     @Override
     public Optional<T> max(Comparator<? super T> comparator) {
-        return delegate.max(new MDCComparator<>(context, comparator));
+        return delegate.max(new MDCComparator<>(() -> MDCContext.of(context), 
comparator));
     }
 
     @Override
     public boolean anyMatch(Predicate<? super T> predicate) {
-        return delegate.anyMatch(new MDCPredicate<>(context, predicate));
+        return delegate.anyMatch(new MDCPredicate<>(() -> 
MDCContext.of(context), predicate));
     }
 
     @Override
     public boolean allMatch(Predicate<? super T> predicate) {
-        return delegate.allMatch(new MDCPredicate<>(context, predicate));
+        return delegate.allMatch(new MDCPredicate<>(() -> 
MDCContext.of(context), predicate));
     }
 
     @Override
     public boolean noneMatch(Predicate<? super T> predicate) {
-        return delegate.noneMatch(new MDCPredicate<>(context, predicate));
+        return delegate.noneMatch(new MDCPredicate<>(() -> 
MDCContext.of(context), predicate));
     }
 
     @Override
@@ -153,17 +160,17 @@ public class MDCStream<T> implements Stream<T> {
 
     @Override
     public IntStream flatMapToInt(Function<? super T, ? extends IntStream> 
mapper) {
-        return delegate.flatMapToInt(new MDCFunction<>(context, mapper));
+        return delegate.flatMapToInt(new MDCFunction<>(() -> 
MDCContext.of(context), mapper));
     }
 
     @Override
     public LongStream flatMapToLong(Function<? super T, ? extends LongStream> 
mapper) {
-        return delegate.flatMapToLong(new MDCFunction<>(context, mapper));
+        return delegate.flatMapToLong(new MDCFunction<>(() -> 
MDCContext.of(context), mapper));
     }
 
     @Override
     public DoubleStream flatMapToDouble(Function<? super T, ? extends 
DoubleStream> mapper) {
-        return delegate.flatMapToDouble(new MDCFunction<>(context, mapper));
+        return delegate.flatMapToDouble(new MDCFunction<>(() -> 
MDCContext.of(context), mapper));
     }
 
     @Override
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
similarity index 72%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
index 6317f038a0..6cd3c8e8c6 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
@@ -17,28 +17,25 @@
 
 package org.apache.seatunnel.api.tracing;
 
-import java.util.function.Consumer;
+import java.util.function.Supplier;
 
-public class MDCConsumer<T> implements Consumer<T> {
+public class MDCSupplier<T> implements Supplier<T> {
     private final MDCContext context;
-    private final Consumer<T> delegate;
+    private final Supplier<T> delegate;
 
-    public MDCConsumer(Consumer<T> delegate) {
+    public MDCSupplier(Supplier<T> delegate) {
         this(MDCContext.current(), delegate);
     }
 
-    public MDCConsumer(MDCContext context, Consumer<T> delegate) {
+    public MDCSupplier(MDCContext context, Supplier<T> delegate) {
         this.context = context;
         this.delegate = delegate;
     }
 
     @Override
-    public void accept(T t) {
-        try {
-            context.put();
-            delegate.accept(t);
-        } finally {
-            context.clear();
+    public T get() {
+        try (MDCContext ignored = context.activate()) {
+            return delegate.get();
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
index d42242aeaf..cf1b0d0345 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.tracing;
 
+import java.util.Comparator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -24,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -173,6 +175,36 @@ public class MDCTracer {
         return new MDCPredicate<>(context, delegate);
     }
 
+    public static <T> MDCComparator<T> tracing(Comparator<T> delegate) {
+        return tracing(MDCContext.current(), delegate);
+    }
+
+    public static <T> MDCComparator<T> tracing(Long jobId, Comparator<T> 
delegate) {
+        return tracing(MDCContext.of(jobId), delegate);
+    }
+
+    public static <T> MDCComparator<T> tracing(MDCContext context, 
Comparator<T> delegate) {
+        if (delegate instanceof MDCComparator) {
+            throw new IllegalArgumentException("Already an MDCComparator");
+        }
+        return new MDCComparator<>(context, delegate);
+    }
+
+    public static <T> MDCSupplier<T> tracing(Supplier<T> delegate) {
+        return tracing(MDCContext.current(), delegate);
+    }
+
+    public static <T> MDCSupplier<T> tracing(Long jobId, Supplier<T> delegate) 
{
+        return tracing(MDCContext.of(jobId), delegate);
+    }
+
+    public static <T> MDCSupplier<T> tracing(MDCContext context, Supplier<T> 
delegate) {
+        if (delegate instanceof MDCSupplier) {
+            throw new IllegalArgumentException("Already an MDCSupplier");
+        }
+        return new MDCSupplier<>(context, delegate);
+    }
+
     public static <T> MDCStream<T> tracing(Stream<T> delegate) {
         return tracing(MDCContext.current(), delegate);
     }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
index 9d8049f1fb..694bac8293 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
@@ -21,10 +21,14 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.MDC;
 
-import java.util.Arrays;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
-import java.util.stream.Stream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class MDCTracerTest {
 
@@ -82,6 +86,43 @@ public class MDCTracerTest {
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
     }
 
+    @Test
+    public void testMDCTracedSupplier() throws Exception {
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+            CompletableFuture.supplyAsync(
+                            MDCTracer.tracing(
+                                    new Supplier<Object>() {
+                                        @Override
+                                        public Object get() {
+                                            Assertions.assertEquals(
+                                                    "1", 
MDC.get(MDCContext.JOB_ID));
+                                            Assertions.assertEquals(
+                                                    "2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                            Assertions.assertEquals(
+                                                    "3", 
MDC.get(MDCContext.TASK_ID));
+                                            return null;
+                                        }
+                                    }))
+                    .get();
+
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+        }
+
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+    }
+
     @Test
     public void testMDCTracedExecutorService() throws Exception {
         MDCContext mdcContext = MDCContext.of(1, 2, 3);
@@ -107,9 +148,6 @@ public class MDCTracerTest {
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
 
-        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
-        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
-        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
         tracedExecutorService
                 .submit(
                         new Callable<Void>() {
@@ -125,19 +163,108 @@ public class MDCTracerTest {
         Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        MDCScheduledExecutorService tracedScheduledExecutorService =
+                MDCTracer.tracing(mdcContext, 
Executors.newSingleThreadScheduledExecutor());
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        tracedScheduledExecutorService
+                .schedule(
+                        new Runnable() {
+                            @Override
+                            public void run() {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                            }
+                        },
+                        1,
+                        TimeUnit.SECONDS)
+                .get();
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        tracedScheduledExecutorService
+                .schedule(
+                        new Callable<Object>() {
+                            @Override
+                            public Object call() {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                                return null;
+                            }
+                        },
+                        1,
+                        TimeUnit.SECONDS)
+                .get();
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        CompletableFuture<Boolean> futureWithScheduleAtFixedRate = new 
CompletableFuture<>();
+        tracedScheduledExecutorService.scheduleAtFixedRate(
+                new Runnable() {
+                    AtomicInteger executeCount = new AtomicInteger(0);
+
+                    @Override
+                    public void run() {
+                        Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                        Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                        Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                        executeCount.incrementAndGet();
+                        if (executeCount.get() > 10 && 
!futureWithScheduleAtFixedRate.isDone()) {
+                            futureWithScheduleAtFixedRate.complete(true);
+                        }
+                    }
+                },
+                0,
+                10,
+                TimeUnit.MILLISECONDS);
+        futureWithScheduleAtFixedRate.join();
+
+        CompletableFuture<Boolean> futureWithScheduleAtFixedDelay = new 
CompletableFuture<>();
+        tracedScheduledExecutorService.scheduleWithFixedDelay(
+                new Runnable() {
+                    AtomicInteger executeCount = new AtomicInteger(0);
+
+                    @Override
+                    public void run() {
+                        Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                        Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                        Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                        executeCount.incrementAndGet();
+                        if (executeCount.get() > 10 && 
!futureWithScheduleAtFixedDelay.isDone()) {
+                            futureWithScheduleAtFixedDelay.complete(true);
+                        }
+                    }
+                },
+                0,
+                10,
+                TimeUnit.MILLISECONDS);
+        futureWithScheduleAtFixedDelay.join();
+
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
     }
 
     @Test
     public void testMDCTracedStream() throws Exception {
         MDCContext mdcContext = MDCContext.of(1, 2, 3);
 
-        Stream<Integer> tracedStream =
-                MDCTracer.tracing(mdcContext, Arrays.asList(1, 2, 
3).parallelStream());
-
         Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
-        tracedStream
+        MDCTracer.tracing(
+                        mdcContext,
+                        IntStream.range(1, 100)
+                                .boxed()
+                                .collect(Collectors.toList())
+                                .parallelStream())
                 .filter(
                         integer -> {
                             Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
@@ -169,58 +296,119 @@ public class MDCTracerTest {
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
 
-        tracedStream = MDCTracer.tracing(mdcContext, Arrays.asList(1, 2, 
3).stream());
+        try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+            MDCTracer.tracing(
+                            IntStream.range(1, 100)
+                                    .boxed()
+                                    .collect(Collectors.toList())
+                                    .parallelStream())
+                    .filter(
+                            integer -> {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                                return true;
+                            })
+                    .map(
+                            integer -> {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                                return integer;
+                            })
+                    .sorted(
+                            (o1, o2) -> {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                                return Integer.compare(o1, o2);
+                            })
+                    .forEach(
+                            integer -> {
+                                Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
+                            });
+
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+        }
+
+        Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+        Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+        Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+        try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+            mdcContext = MDCContext.of(4, 5, 6);
+            MDCTracer.tracing(
+                            mdcContext,
+                            IntStream.range(1, 100)
+                                    .boxed()
+                                    .collect(Collectors.toList())
+                                    .parallelStream())
+                    .filter(
+                            integer -> {
+                                Assertions.assertEquals("4", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("5", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("6", 
MDC.get(MDCContext.TASK_ID));
+                                return true;
+                            })
+                    .map(
+                            integer -> {
+                                Assertions.assertEquals("4", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("5", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("6", 
MDC.get(MDCContext.TASK_ID));
+                                return integer;
+                            })
+                    .sorted(
+                            (o1, o2) -> {
+                                Assertions.assertEquals("4", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("5", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("6", 
MDC.get(MDCContext.TASK_ID));
+                                return Integer.compare(o1, o2);
+                            })
+                    .forEach(
+                            integer -> {
+                                Assertions.assertEquals("4", 
MDC.get(MDCContext.JOB_ID));
+                                Assertions.assertEquals("5", 
MDC.get(MDCContext.PIPELINE_ID));
+                                Assertions.assertEquals("6", 
MDC.get(MDCContext.TASK_ID));
+                            });
+
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+        }
 
         Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
-        tracedStream
-                .filter(
-                        integer -> {
-                            Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
-                            Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
-                            Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
-                            return true;
-                        })
-                .map(
-                        integer -> {
-                            Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
-                            Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
-                            Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
-                            return integer;
-                        })
-                .sorted(
-                        (o1, o2) -> {
-                            Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
-                            Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
-                            Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
-                            return Integer.compare(o1, o2);
-                        })
-                .forEach(
-                        integer -> {
-                            Assertions.assertEquals("1", 
MDC.get(MDCContext.JOB_ID));
-                            Assertions.assertEquals("2", 
MDC.get(MDCContext.PIPELINE_ID));
-                            Assertions.assertEquals("3", 
MDC.get(MDCContext.TASK_ID));
-                        });
     }
 
     @Test
     public void testMDCContext() throws Exception {
-        MDCContext.current();
         Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
 
         MDCContext mdcContext = MDCContext.of(1, 2, 3);
-        mdcContext.put();
-        Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
-        Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
-        Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+        try (MDCContext ignored = mdcContext.activate()) {
+            Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+            Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+            Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
 
-        MDCContext currentMDCCOntext = MDCContext.current();
-        Assertions.assertEquals(mdcContext, currentMDCCOntext);
+            MDCContext currentMDCCOntext = MDCContext.current();
+            Assertions.assertEquals(mdcContext, currentMDCCOntext);
+        }
 
-        mdcContext.clear();
         Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
         Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
         Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
index 08d859a4b1..3d088e9d93 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
@@ -38,12 +38,8 @@ public abstract class TracingOperation extends Operation {
 
     @Override
     public final void run() throws Exception {
-        try {
-            context.put();
-
+        try (MDCContext ignored = context.activate()) {
             runInternal();
-        } finally {
-            context.clear();
         }
     }
 

Reply via email to