This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new adb656e298 [Hotfix][Zeta][Log] Fix zeta parallelStream log trace for
job (#8456)
adb656e298 is described below
commit adb656e29815e36b8bbcc383e7b400bc38f050c9
Author: hailin0 <[email protected]>
AuthorDate: Tue Jan 7 10:26:44 2025 +0800
[Hotfix][Zeta][Log] Fix zeta parallelStream log trace for job (#8456)
---
.../apache/seatunnel/api/tracing/MDCCallable.java | 15 +-
.../seatunnel/api/tracing/MDCComparator.java | 14 +-
.../apache/seatunnel/api/tracing/MDCConsumer.java | 14 +-
.../apache/seatunnel/api/tracing/MDCContext.java | 153 +++++++----
.../apache/seatunnel/api/tracing/MDCExecutor.java | 2 +-
.../seatunnel/api/tracing/MDCExecutorService.java | 14 +-
.../apache/seatunnel/api/tracing/MDCFunction.java | 14 +-
.../apache/seatunnel/api/tracing/MDCPredicate.java | 14 +-
.../apache/seatunnel/api/tracing/MDCRunnable.java | 16 +-
.../api/tracing/MDCScheduledExecutorService.java | 10 +-
.../apache/seatunnel/api/tracing/MDCStream.java | 37 +--
.../tracing/{MDCConsumer.java => MDCSupplier.java} | 19 +-
.../apache/seatunnel/api/tracing/MDCTracer.java | 32 +++
.../seatunnel/api/tracing/MDCTracerTest.java | 280 +++++++++++++++++----
.../server/task/operation/TracingOperation.java | 6 +-
15 files changed, 466 insertions(+), 174 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
index f3cae160da..b56101e8c7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCCallable.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.tracing;
import java.util.concurrent.Callable;
+import java.util.function.Supplier;
/**
* Callable that sets MDC context before calling the delegate and clears it
afterwards.
@@ -25,7 +26,7 @@ import java.util.concurrent.Callable;
* @param <V>
*/
public class MDCCallable<V> implements Callable<V> {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
private final Callable<V> delegate;
public MDCCallable(Callable<V> delegate) {
@@ -33,18 +34,18 @@ public class MDCCallable<V> implements Callable<V> {
}
public MDCCallable(MDCContext context, Callable<V> delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCCallable(Supplier<MDCContext> contextSupplier, Callable<V>
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public V call() throws Exception {
- try {
- context.put();
-
+ try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.call();
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
index 5672cf4392..8921b5356b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCComparator.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.api.tracing;
import java.util.Comparator;
+import java.util.function.Supplier;
public class MDCComparator<T> implements Comparator<T> {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
private final Comparator<T> delegate;
public MDCComparator(Comparator<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCComparator<T> implements Comparator<T> {
}
public MDCComparator(MDCContext context, Comparator<T> delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCComparator(Supplier<MDCContext> contextSupplier, Comparator<T>
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public int compare(T o1, T o2) {
- try {
- context.put();
+ try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.compare(o1, o2);
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
index 6317f038a0..ecfb3b5a66 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.api.tracing;
import java.util.function.Consumer;
+import java.util.function.Supplier;
public class MDCConsumer<T> implements Consumer<T> {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
private final Consumer<T> delegate;
public MDCConsumer(Consumer<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCConsumer<T> implements Consumer<T> {
}
public MDCConsumer(MDCContext context, Consumer<T> delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCConsumer(Supplier<MDCContext> contextSupplier, Consumer<T>
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public void accept(T t) {
- try {
- context.put();
+ try (MDCContext ignored = contextSupplier.get().activate()) {
delegate.accept(t);
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
index 4a43937459..e343e5be1b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCContext.java
@@ -19,22 +19,37 @@ package org.apache.seatunnel.api.tracing;
import org.slf4j.MDC;
-import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
+import java.io.Closeable;
import java.io.Serializable;
/**
* MDC context for tracing.
*
* <p>reference: https://www.slf4j.org/manual.html#mdc
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * try (MDCContext ctx = MDCContext.of(jobId, pipelineId,
taskId).activate()) {
+ * // do something
+ * new Thread(new MDCRunnable(MDCContext.current(), new Runnable() {
+ * @Override
+ * public void run() {
+ * // do something
+ * }
+ * }))
+ * .start();
+ * }
+ * // MDC context will be restored after the try block
+ * </pre>
*/
@Slf4j
-@Builder
@EqualsAndHashCode
-public class MDCContext implements Serializable {
- private static final MDCContext EMPTY = MDCContext.builder().build();
+public class MDCContext implements Serializable, Closeable {
+ private static final MDCContext EMPTY = new MDCContext(null, null, null);
private static final String EMPTY_TO_STRING = "NA";
public static final String JOB_ID = "ST-JID";
@@ -44,55 +59,24 @@ public class MDCContext implements Serializable {
private final Long jobId;
private final Long pipelineId;
private final Long taskId;
+ private transient volatile MDCContext toRestore;
- public static MDCContext of(long jobId) {
- return MDCContext.builder().jobId(jobId).build();
- }
-
- public static MDCContext of(long jobId, long pipelineId) {
- return
MDCContext.builder().jobId(jobId).pipelineId(pipelineId).build();
- }
-
- public static MDCContext of(long jobId, long pipelineId, long taskId) {
- return
MDCContext.builder().jobId(jobId).pipelineId(pipelineId).taskId(taskId).build();
- }
-
- public static MDCContext current() {
- return MDCContext.builder()
- .jobId(MDC.get(JOB_ID) != null ?
Long.parseLong(MDC.get(JOB_ID)) : null)
- .pipelineId(
- MDC.get(PIPELINE_ID) != null ?
Long.parseLong(MDC.get(PIPELINE_ID)) : null)
- .taskId(MDC.get(TASK_ID) != null ?
Long.parseLong(MDC.get(TASK_ID)) : null)
- .build();
+ public MDCContext(Long jobId, Long pipelineId, Long taskId) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.taskId = taskId;
}
- public static MDCContext valueOf(String s) {
- if (EMPTY_TO_STRING.equals(s)) {
- return EMPTY;
+ public synchronized MDCContext activate() {
+ if (this == EMPTY) {
+ return this;
}
- String[] arr = s.split("/");
- Long jobId = Long.parseLong(arr[0]);
- Long pipelineId = Long.parseLong(arr[1]);
- Long taskId = Long.parseLong(arr[2]);
- if (pipelineId == 0 || taskId == 0) {
- return MDCContext.of(jobId);
- }
- return MDCContext.of(jobId, pipelineId, taskId);
- }
-
- @Override
- public String toString() {
- if (jobId != null) {
- return String.format(
- "%d/%d/%d",
- jobId, pipelineId == null ? 0 : pipelineId, taskId == null
? 0 : taskId);
- } else {
- return EMPTY_TO_STRING;
+ if (this.toRestore != null) {
+ throw new IllegalStateException("MDCContext is already activated");
}
- }
+ this.toRestore = current();
- public void put() {
try {
if (jobId != null) {
MDC.put(JOB_ID, String.valueOf(jobId));
@@ -107,9 +91,18 @@ public class MDCContext implements Serializable {
log.error("Failed to put MDC context", e);
throw e;
}
+ return this;
}
- public void clear() {
+ public synchronized MDCContext deactivate() {
+ if (this == EMPTY) {
+ return this;
+ }
+
+ if (this.toRestore == null) {
+ throw new IllegalStateException("MDCContext is not activated");
+ }
+
try {
MDC.remove(JOB_ID);
MDC.remove(PIPELINE_ID);
@@ -118,5 +111,71 @@ public class MDCContext implements Serializable {
log.error("Failed to clear MDC context", e);
throw e;
}
+
+ if (this.toRestore != null) {
+ this.toRestore.activate();
+ }
+
+ return this;
+ }
+
+ @Override
+ public void close() {
+ deactivate();
+ }
+
+ @Override
+ public String toString() {
+ if (this == EMPTY) {
+ return EMPTY_TO_STRING;
+ }
+ return String.format(
+ "%d/%d/%d",
+ jobId, pipelineId == null ? 0 : pipelineId, taskId == null ? 0
: taskId);
+ }
+
+ public static MDCContext of(long jobId) {
+ return new MDCContext(jobId, null, null);
+ }
+
+ public static MDCContext of(long jobId, long pipelineId) {
+ return new MDCContext(jobId, pipelineId, null);
+ }
+
+ public static MDCContext of(long jobId, long pipelineId, long taskId) {
+ return new MDCContext(jobId, pipelineId, taskId);
+ }
+
+ public static MDCContext of(MDCContext context) {
+ return new MDCContext(context.jobId, context.pipelineId,
context.taskId);
+ }
+
+ public static MDCContext current() {
+ String jobId = MDC.get(JOB_ID);
+ if (jobId == null) {
+ return EMPTY;
+ }
+
+ String pipelineId = MDC.get(PIPELINE_ID);
+ String taskId = MDC.get(TASK_ID);
+ return new MDCContext(
+ Long.parseLong(jobId),
+ pipelineId != null ? Long.parseLong(pipelineId) : null,
+ taskId != null ? Long.parseLong(taskId) : null);
+ }
+
+ public static MDCContext valueOf(String s) {
+ if (EMPTY_TO_STRING.equals(s)) {
+ return EMPTY;
+ }
+
+ String[] arr = s.split("/");
+ Long jobId = Long.parseLong(arr[0]);
+ Long pipelineId = Long.parseLong(arr[1]);
+ Long taskId = Long.parseLong(arr[2]);
+ if (pipelineId == 0 || taskId == 0) {
+ return MDCContext.of(jobId);
+ }
+ return MDCContext.of(jobId, pipelineId, taskId);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
index 1651f7d6c4..e6d02df78e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutor.java
@@ -31,6 +31,6 @@ public class MDCExecutor implements Executor {
@Override
public void execute(Runnable command) {
- delegate.execute(new MDCRunnable(context, command));
+ delegate.execute(new MDCRunnable(MDCContext.of(context), command));
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
index 7ef93f41df..f010dfe717 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCExecutorService.java
@@ -65,17 +65,17 @@ public class MDCExecutorService extends MDCExecutor
implements ExecutorService {
@Override
public <T> Future<T> submit(Callable<T> task) {
- return delegate.submit(new MDCCallable<>(context, task));
+ return delegate.submit(new MDCCallable<>(MDCContext.of(context),
task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
- return delegate.submit(new MDCRunnable(context, task), result);
+ return delegate.submit(new MDCRunnable(MDCContext.of(context), task),
result);
}
@Override
public Future<?> submit(Runnable task) {
- return delegate.submit(new MDCRunnable(context, task));
+ return delegate.submit(new MDCRunnable(MDCContext.of(context), task));
}
@Override
@@ -83,7 +83,7 @@ public class MDCExecutorService extends MDCExecutor
implements ExecutorService {
throws InterruptedException {
return delegate.invokeAll(
tasks.stream()
- .map(task -> new MDCCallable<>(context, task))
+ .map(task -> new MDCCallable<>(MDCContext.of(context),
task))
.collect(Collectors.toList()));
}
@@ -93,7 +93,7 @@ public class MDCExecutorService extends MDCExecutor
implements ExecutorService {
throws InterruptedException {
return delegate.invokeAll(
tasks.stream()
- .map(task -> new MDCCallable<>(context, task))
+ .map(task -> new MDCCallable<>(MDCContext.of(context),
task))
.collect(Collectors.toList()),
timeout,
unit);
@@ -104,7 +104,7 @@ public class MDCExecutorService extends MDCExecutor
implements ExecutorService {
throws InterruptedException, ExecutionException {
return delegate.invokeAny(
tasks.stream()
- .map(task -> new MDCCallable<>(context, task))
+ .map(task -> new MDCCallable<>(MDCContext.of(context),
task))
.collect(Collectors.toList()));
}
@@ -113,7 +113,7 @@ public class MDCExecutorService extends MDCExecutor
implements ExecutorService {
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(
tasks.stream()
- .map(task -> new MDCCallable<>(context, task))
+ .map(task -> new MDCCallable<>(MDCContext.of(context),
task))
.collect(Collectors.toList()),
timeout,
unit);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
index b8f0d22248..c4dc2463c4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCFunction.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.api.tracing;
import java.util.function.Function;
+import java.util.function.Supplier;
public class MDCFunction<T, R> implements Function<T, R> {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
protected final Function<T, R> delegate;
public MDCFunction(Function<T, R> delegate) {
@@ -28,17 +29,18 @@ public class MDCFunction<T, R> implements Function<T, R> {
}
public MDCFunction(MDCContext context, Function<T, R> delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCFunction(Supplier<MDCContext> contextSupplier, Function<T, R>
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public R apply(T t) {
- try {
- context.put();
+ try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.apply(t);
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
index d04edc064b..d01c290b57 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCPredicate.java
@@ -18,9 +18,10 @@
package org.apache.seatunnel.api.tracing;
import java.util.function.Predicate;
+import java.util.function.Supplier;
public class MDCPredicate<T> implements Predicate<T> {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
private final Predicate<T> delegate;
public MDCPredicate(Predicate<T> delegate) {
@@ -28,17 +29,18 @@ public class MDCPredicate<T> implements Predicate<T> {
}
public MDCPredicate(MDCContext context, Predicate<T> delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCPredicate(Supplier<MDCContext> contextSupplier, Predicate<T>
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public boolean test(T t) {
- try {
- context.put();
+ try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.test(t);
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
index e6d310de10..02544db696 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCRunnable.java
@@ -17,9 +17,11 @@
package org.apache.seatunnel.api.tracing;
+import java.util.function.Supplier;
+
/** Runnable that sets MDC context before calling the delegate and clears it
afterwards. */
public class MDCRunnable implements Runnable {
- private final MDCContext context;
+ private final Supplier<MDCContext> contextSupplier;
private final Runnable delegate;
public MDCRunnable(Runnable delegate) {
@@ -27,18 +29,18 @@ public class MDCRunnable implements Runnable {
}
public MDCRunnable(MDCContext context, Runnable delegate) {
- this.context = context;
+ this(() -> context, delegate);
+ }
+
+ public MDCRunnable(Supplier<MDCContext> contextSupplier, Runnable
delegate) {
+ this.contextSupplier = contextSupplier;
this.delegate = delegate;
}
@Override
public void run() {
- try {
- context.put();
-
+ try (MDCContext ignored = contextSupplier.get().activate()) {
delegate.run();
- } finally {
- context.clear();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
index 804e953ace..ed43a9fa9c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCScheduledExecutorService.java
@@ -39,25 +39,27 @@ public class MDCScheduledExecutorService extends
MDCExecutorService
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
- return delegate.schedule(new MDCRunnable(context, command), delay,
unit);
+ return delegate.schedule(
+ new MDCRunnable(() -> MDCContext.of(context), command), delay,
unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
- return delegate.schedule(new MDCCallable<>(context, callable), delay,
unit);
+ return delegate.schedule(
+ new MDCCallable<>(() -> MDCContext.of(context), callable),
delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(
- new MDCRunnable(context, command), initialDelay, period, unit);
+ new MDCRunnable(() -> MDCContext.of(context), command),
initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(
- new MDCRunnable(context, command), initialDelay, delay, unit);
+ new MDCRunnable(() -> MDCContext.of(context), command),
initialDelay, delay, unit);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
index 871af5cd60..3f85d55c79 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCStream.java
@@ -53,62 +53,69 @@ public class MDCStream<T> implements Stream<T> {
@Override
public Stream<T> filter(Predicate<? super T> predicate) {
- return new MDCStream<>(context, delegate.filter(new
MDCPredicate<>(context, predicate)));
+ return new MDCStream<>(
+ context,
+ delegate.filter(new MDCPredicate<>(() ->
MDCContext.of(context), predicate)));
}
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
- return new MDCStream<>(context, delegate.map(new
MDCFunction<>(context, mapper)));
+ return new MDCStream<>(
+ context, delegate.map(new MDCFunction<>(() ->
MDCContext.of(context), mapper)));
}
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<?
extends R>> mapper) {
- return new MDCStream<>(context, delegate.flatMap(new
MDCFunction<>(context, mapper)));
+ return new MDCStream<>(
+ context, delegate.flatMap(new MDCFunction<>(() ->
MDCContext.of(context), mapper)));
}
@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
- return new MDCStream<>(context, delegate.sorted(new
MDCComparator<>(context, comparator)));
+ return new MDCStream<>(
+ context,
+ delegate.sorted(new MDCComparator<>(() ->
MDCContext.of(context), comparator)));
}
@Override
public Stream<T> peek(Consumer<? super T> action) {
- return new MDCStream<>(context, delegate.peek(new
MDCConsumer<>(context, action)));
+ return new MDCStream<>(
+ context, delegate.peek(new MDCConsumer<>(() ->
MDCContext.of(context), action)));
}
@Override
public void forEach(Consumer<? super T> action) {
- delegate.forEach(new MDCConsumer<>(context, action));
+ delegate.forEach(new MDCConsumer<>(() -> MDCContext.of(context),
action));
}
@Override
public void forEachOrdered(Consumer<? super T> action) {
- delegate.forEachOrdered(new MDCConsumer<>(context, action));
+ delegate.forEachOrdered(new MDCConsumer<>(() ->
MDCContext.of(context), action));
}
@Override
public Optional<T> min(Comparator<? super T> comparator) {
- return delegate.min(new MDCComparator<>(context, comparator));
+ return delegate.min(new MDCComparator<>(() -> MDCContext.of(context),
comparator));
}
@Override
public Optional<T> max(Comparator<? super T> comparator) {
- return delegate.max(new MDCComparator<>(context, comparator));
+ return delegate.max(new MDCComparator<>(() -> MDCContext.of(context),
comparator));
}
@Override
public boolean anyMatch(Predicate<? super T> predicate) {
- return delegate.anyMatch(new MDCPredicate<>(context, predicate));
+ return delegate.anyMatch(new MDCPredicate<>(() ->
MDCContext.of(context), predicate));
}
@Override
public boolean allMatch(Predicate<? super T> predicate) {
- return delegate.allMatch(new MDCPredicate<>(context, predicate));
+ return delegate.allMatch(new MDCPredicate<>(() ->
MDCContext.of(context), predicate));
}
@Override
public boolean noneMatch(Predicate<? super T> predicate) {
- return delegate.noneMatch(new MDCPredicate<>(context, predicate));
+ return delegate.noneMatch(new MDCPredicate<>(() ->
MDCContext.of(context), predicate));
}
@Override
@@ -153,17 +160,17 @@ public class MDCStream<T> implements Stream<T> {
@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream>
mapper) {
- return delegate.flatMapToInt(new MDCFunction<>(context, mapper));
+ return delegate.flatMapToInt(new MDCFunction<>(() ->
MDCContext.of(context), mapper));
}
@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream>
mapper) {
- return delegate.flatMapToLong(new MDCFunction<>(context, mapper));
+ return delegate.flatMapToLong(new MDCFunction<>(() ->
MDCContext.of(context), mapper));
}
@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends
DoubleStream> mapper) {
- return delegate.flatMapToDouble(new MDCFunction<>(context, mapper));
+ return delegate.flatMapToDouble(new MDCFunction<>(() ->
MDCContext.of(context), mapper));
}
@Override
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
similarity index 72%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
index 6317f038a0..6cd3c8e8c6 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCConsumer.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCSupplier.java
@@ -17,28 +17,25 @@
package org.apache.seatunnel.api.tracing;
-import java.util.function.Consumer;
+import java.util.function.Supplier;
-public class MDCConsumer<T> implements Consumer<T> {
+public class MDCSupplier<T> implements Supplier<T> {
private final MDCContext context;
- private final Consumer<T> delegate;
+ private final Supplier<T> delegate;
- public MDCConsumer(Consumer<T> delegate) {
+ public MDCSupplier(Supplier<T> delegate) {
this(MDCContext.current(), delegate);
}
- public MDCConsumer(MDCContext context, Consumer<T> delegate) {
+ public MDCSupplier(MDCContext context, Supplier<T> delegate) {
this.context = context;
this.delegate = delegate;
}
@Override
- public void accept(T t) {
- try {
- context.put();
- delegate.accept(t);
- } finally {
- context.clear();
+ public T get() {
+ try (MDCContext ignored = context.activate()) {
+ return delegate.get();
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
index d42242aeaf..cf1b0d0345 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/tracing/MDCTracer.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.tracing;
+import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -24,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -173,6 +175,36 @@ public class MDCTracer {
return new MDCPredicate<>(context, delegate);
}
+ public static <T> MDCComparator<T> tracing(Comparator<T> delegate) {
+ return tracing(MDCContext.current(), delegate);
+ }
+
+ public static <T> MDCComparator<T> tracing(Long jobId, Comparator<T>
delegate) {
+ return tracing(MDCContext.of(jobId), delegate);
+ }
+
+ public static <T> MDCComparator<T> tracing(MDCContext context,
Comparator<T> delegate) {
+ if (delegate instanceof MDCComparator) {
+ throw new IllegalArgumentException("Already an MDCComparator");
+ }
+ return new MDCComparator<>(context, delegate);
+ }
+
+ public static <T> MDCSupplier<T> tracing(Supplier<T> delegate) {
+ return tracing(MDCContext.current(), delegate);
+ }
+
+ public static <T> MDCSupplier<T> tracing(Long jobId, Supplier<T> delegate)
{
+ return tracing(MDCContext.of(jobId), delegate);
+ }
+
+ public static <T> MDCSupplier<T> tracing(MDCContext context, Supplier<T>
delegate) {
+ if (delegate instanceof MDCSupplier) {
+ throw new IllegalArgumentException("Already an MDCSupplier");
+ }
+ return new MDCSupplier<>(context, delegate);
+ }
+
public static <T> MDCStream<T> tracing(Stream<T> delegate) {
return tracing(MDCContext.current(), delegate);
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
index 9d8049f1fb..694bac8293 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/tracing/MDCTracerTest.java
@@ -21,10 +21,14 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.MDC;
-import java.util.Arrays;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
-import java.util.stream.Stream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class MDCTracerTest {
@@ -82,6 +86,43 @@ public class MDCTracerTest {
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
}
+ @Test
+ public void testMDCTracedSupplier() throws Exception {
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+ CompletableFuture.supplyAsync(
+ MDCTracer.tracing(
+ new Supplier<Object>() {
+ @Override
+ public Object get() {
+ Assertions.assertEquals(
+ "1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals(
+ "2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals(
+ "3",
MDC.get(MDCContext.TASK_ID));
+ return null;
+ }
+ }))
+ .get();
+
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+ }
+
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+ }
+
@Test
public void testMDCTracedExecutorService() throws Exception {
MDCContext mdcContext = MDCContext.of(1, 2, 3);
@@ -107,9 +148,6 @@ public class MDCTracerTest {
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
- Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
- Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
tracedExecutorService
.submit(
new Callable<Void>() {
@@ -125,19 +163,108 @@ public class MDCTracerTest {
Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ MDCScheduledExecutorService tracedScheduledExecutorService =
+ MDCTracer.tracing(mdcContext,
Executors.newSingleThreadScheduledExecutor());
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ tracedScheduledExecutorService
+ .schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ }
+ },
+ 1,
+ TimeUnit.SECONDS)
+ .get();
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ tracedScheduledExecutorService
+ .schedule(
+ new Callable<Object>() {
+ @Override
+ public Object call() {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ return null;
+ }
+ },
+ 1,
+ TimeUnit.SECONDS)
+ .get();
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ CompletableFuture<Boolean> futureWithScheduleAtFixedRate = new
CompletableFuture<>();
+ tracedScheduledExecutorService.scheduleAtFixedRate(
+ new Runnable() {
+ AtomicInteger executeCount = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ executeCount.incrementAndGet();
+ if (executeCount.get() > 10 &&
!futureWithScheduleAtFixedRate.isDone()) {
+ futureWithScheduleAtFixedRate.complete(true);
+ }
+ }
+ },
+ 0,
+ 10,
+ TimeUnit.MILLISECONDS);
+ futureWithScheduleAtFixedRate.join();
+
+ CompletableFuture<Boolean> futureWithScheduleAtFixedDelay = new
CompletableFuture<>();
+ tracedScheduledExecutorService.scheduleWithFixedDelay(
+ new Runnable() {
+ AtomicInteger executeCount = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ executeCount.incrementAndGet();
+ if (executeCount.get() > 10 &&
!futureWithScheduleAtFixedDelay.isDone()) {
+ futureWithScheduleAtFixedDelay.complete(true);
+ }
+ }
+ },
+ 0,
+ 10,
+ TimeUnit.MILLISECONDS);
+ futureWithScheduleAtFixedDelay.join();
+
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
}
@Test
public void testMDCTracedStream() throws Exception {
MDCContext mdcContext = MDCContext.of(1, 2, 3);
- Stream<Integer> tracedStream =
- MDCTracer.tracing(mdcContext, Arrays.asList(1, 2,
3).parallelStream());
-
Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
- tracedStream
+ MDCTracer.tracing(
+ mdcContext,
+ IntStream.range(1, 100)
+ .boxed()
+ .collect(Collectors.toList())
+ .parallelStream())
.filter(
integer -> {
Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
@@ -169,58 +296,119 @@ public class MDCTracerTest {
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
- tracedStream = MDCTracer.tracing(mdcContext, Arrays.asList(1, 2,
3).stream());
+ try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+ MDCTracer.tracing(
+ IntStream.range(1, 100)
+ .boxed()
+ .collect(Collectors.toList())
+ .parallelStream())
+ .filter(
+ integer -> {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ return true;
+ })
+ .map(
+ integer -> {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ return integer;
+ })
+ .sorted(
+ (o1, o2) -> {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ return Integer.compare(o1, o2);
+ })
+ .forEach(
+ integer -> {
+ Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
+ });
+
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+ }
+
+ Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
+ Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
+
+ try (MDCContext ignored = MDCContext.of(1, 2, 3).activate()) {
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+
+ mdcContext = MDCContext.of(4, 5, 6);
+ MDCTracer.tracing(
+ mdcContext,
+ IntStream.range(1, 100)
+ .boxed()
+ .collect(Collectors.toList())
+ .parallelStream())
+ .filter(
+ integer -> {
+ Assertions.assertEquals("4",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("5",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("6",
MDC.get(MDCContext.TASK_ID));
+ return true;
+ })
+ .map(
+ integer -> {
+ Assertions.assertEquals("4",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("5",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("6",
MDC.get(MDCContext.TASK_ID));
+ return integer;
+ })
+ .sorted(
+ (o1, o2) -> {
+ Assertions.assertEquals("4",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("5",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("6",
MDC.get(MDCContext.TASK_ID));
+ return Integer.compare(o1, o2);
+ })
+ .forEach(
+ integer -> {
+ Assertions.assertEquals("4",
MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("5",
MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("6",
MDC.get(MDCContext.TASK_ID));
+ });
+
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+ }
Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
- tracedStream
- .filter(
- integer -> {
- Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
- Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
- return true;
- })
- .map(
- integer -> {
- Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
- Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
- return integer;
- })
- .sorted(
- (o1, o2) -> {
- Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
- Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
- return Integer.compare(o1, o2);
- })
- .forEach(
- integer -> {
- Assertions.assertEquals("1",
MDC.get(MDCContext.JOB_ID));
- Assertions.assertEquals("2",
MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertEquals("3",
MDC.get(MDCContext.TASK_ID));
- });
}
@Test
public void testMDCContext() throws Exception {
- MDCContext.current();
Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
MDCContext mdcContext = MDCContext.of(1, 2, 3);
- mdcContext.put();
- Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
- Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
- Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
+ try (MDCContext ignored = mdcContext.activate()) {
+ Assertions.assertEquals("1", MDC.get(MDCContext.JOB_ID));
+ Assertions.assertEquals("2", MDC.get(MDCContext.PIPELINE_ID));
+ Assertions.assertEquals("3", MDC.get(MDCContext.TASK_ID));
- MDCContext currentMDCCOntext = MDCContext.current();
- Assertions.assertEquals(mdcContext, currentMDCCOntext);
+ MDCContext currentMDCCOntext = MDCContext.current();
+ Assertions.assertEquals(mdcContext, currentMDCCOntext);
+ }
- mdcContext.clear();
Assertions.assertNull(MDC.get(MDCContext.JOB_ID));
Assertions.assertNull(MDC.get(MDCContext.PIPELINE_ID));
Assertions.assertNull(MDC.get(MDCContext.TASK_ID));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
index 08d859a4b1..3d088e9d93 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/TracingOperation.java
@@ -38,12 +38,8 @@ public abstract class TracingOperation extends Operation {
@Override
public final void run() throws Exception {
- try {
- context.put();
-
+ try (MDCContext ignored = context.activate()) {
runInternal();
- } finally {
- context.clear();
}
}