This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 976a8ca16 [Hotfix][ST-Engine] Fix user-defined threads cannot use
engine metrics (#3660)
976a8ca16 is described below
commit 976a8ca16a14590a7618b622127020cea101c938
Author: ic4y <[email protected]>
AuthorDate: Thu Dec 8 17:31:54 2022 +0800
[Hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics
(#3660)
* [hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics
* [hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics
---
.../engine/server/TaskExecutionService.java | 8 ---
.../seatunnel/engine/server/metrics/Metrics.java | 55 -----------------
.../engine/server/metrics/MetricsContext.java | 69 ++++++++++++++++-----
.../engine/server/metrics/MetricsImpl.java | 72 ----------------------
.../server/task/SeaTunnelSourceCollector.java | 11 ++--
.../engine/server/task/SeaTunnelTask.java | 5 +-
.../engine/server/task/SourceSeaTunnelTask.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 12 ++--
8 files changed, 74 insertions(+), 160 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index ad6a592e4..37b3e93e9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -45,7 +45,6 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.metrics.MetricsImpl;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -340,11 +339,9 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
- MetricsImpl.Container userMetricsContextContainer =
MetricsImpl.container();
try {
startedLatch.countDown();
t.init();
- userMetricsContextContainer.setContext(t.getMetricsContext());
ProgressState result;
do {
result = t.call();
@@ -355,7 +352,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone();
- userMetricsContextContainer.setContext(null);
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
@@ -381,7 +377,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
public AtomicReference<TaskTracker> exclusiveTaskTracker = new
AtomicReference<>();
final TaskCallTimer timer;
private Thread myThread;
- private MetricsImpl.Container userMetricsContextContainer;
public LinkedBlockingDeque<TaskTracker> taskqueue;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -396,7 +391,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
@Override
public void run() {
myThread = currentThread();
- userMetricsContextContainer = MetricsImpl.container();
while (keep.get() && isRunning) {
TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
exclusiveTaskTracker.get() :
@@ -420,7 +414,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
try {
//run task
myThread.setContextClassLoader(executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader());
-
userMetricsContextContainer.setContext(taskTracker.task.getMetricsContext());
call = taskTracker.task.call();
synchronized (timer) {
timer.timerStop();
@@ -437,7 +430,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
} finally {
//stop timer
timer.timerStop();
- userMetricsContextContainer.setContext(null);
}
//task call finished
if (null != call) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
deleted file mode 100644
index 72a8b2e67..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-
-public final class Metrics {
-
- private Metrics() {
- }
-
- public static Metric metric(String name) {
- return MetricsImpl.metric(name, Unit.COUNT);
- }
-
- /**
- * Same as {@link #metric(String)}, but allows us to also specify the
- * measurement {@link Unit} of the metric.
- */
- public static Metric metric(String name, Unit unit) {
- return MetricsImpl.metric(name, unit);
- }
-
- public static Metric qpsMetric(String name, Unit unit) {
- return MetricsImpl.qpsMetric(name, unit);
- }
-
- public static Metric threadSafeMetric(String name) {
- return MetricsImpl.threadSafeMetric(name, Unit.COUNT);
- }
-
- /**
- * Same as {@link #threadSafeMetric(String)}, but allows us to also
- * specify the measurement {@link Unit} of the metric.
- */
- public static Metric threadSafeMetric(String name, Unit unit) {
- return MetricsImpl.threadSafeMetric(name, unit);
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
index f54e02f79..be8480229 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -36,23 +36,27 @@ public class MetricsContext implements
DynamicMetricsProvider {
private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
-
private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_QPS_METRIC = SingleWriterQPSMetric::new;
+ private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_QPS_METRIC = ThreadSafeQPSMetric::new;
private volatile Map<String, AbstractMetric> metrics;
- Metric metric(String name, Unit unit) {
+ public Metric metric(String name, Unit unit) {
return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
}
- Metric qpsMetric(String name, Unit unit) {
+ public Metric qpsMetric(String name, Unit unit) {
return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
}
- Metric threadSafeMetric(String name, Unit unit) {
+ public Metric threadSafeMetric(String name, Unit unit) {
return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
}
+ public Metric threadSafeQpsMetric(String name, Unit unit) {
+ return metric(name, unit, CREATE_THREAD_SAFE_QPS_METRIC);
+ }
+
private Metric metric(String name, Unit unit, BiFunction<String, Unit,
AbstractMetric> metricSupplier) {
if (metrics == null) { //first metric being stored
metrics = new ConcurrentHashMap<>();
@@ -120,39 +124,35 @@ public class MetricsContext implements
DynamicMetricsProvider {
AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class,
"value");
private volatile long value;
- private volatile long timestamp;
+ private final long timestamp;
SingleWriterQPSMetric(String name, Unit unit) {
super(name, unit);
+ timestamp = System.currentTimeMillis();
}
@Override
public void set(long newValue) {
- checkAndSetStartTime();
VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
}
@Override
public void increment() {
- checkAndSetStartTime();
VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
}
@Override
public void increment(long increment) {
- checkAndSetStartTime();
VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
}
@Override
public void decrement() {
- checkAndSetStartTime();
VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
}
@Override
public void decrement(long decrement) {
- checkAndSetStartTime();
VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
}
@@ -162,11 +162,52 @@ public class MetricsContext implements
DynamicMetricsProvider {
long cost = System.currentTimeMillis() - timestamp;
return (double) value * 1000 / cost;
}
+ }
+
+ private static final class ThreadSafeQPSMetric extends AbstractMetric {
+
+ private static final AtomicLongFieldUpdater<ThreadSafeQPSMetric>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMetric.class,
"value");
+
+ private volatile long value;
+
+ private final long timestamp;
+
+ ThreadSafeQPSMetric(String name, Unit unit) {
+ super(name, unit);
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void increment() {
+ VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+ }
+
+ @Override
+ public void increment(long amount) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
+ }
+
+ @Override
+ public void decrement() {
+ VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+ }
+
+ @Override
+ public void decrement(long amount) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
+ }
- private void checkAndSetStartTime(){
- if (timestamp == 0){
- timestamp = System.currentTimeMillis();
- }
+ @Override
+ public void set(long newValue) {
+ VOLATILE_VALUE_UPDATER.set(this, newValue);
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ protected Object get() {
+ long cost = System.currentTimeMillis() - timestamp;
+ return (double) value * 1000 / cost;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
deleted file mode 100644
index 6eea089a2..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-
-public final class MetricsImpl {
-
- private static final ThreadLocal<Container> CONTEXT =
ThreadLocal.withInitial(Container::new);
-
- private MetricsImpl() {
- }
-
- public static Container container() {
- return CONTEXT.get();
- }
-
- public static Metric metric(String name, Unit unit) {
- return getContext().metric(name, unit);
- }
-
- public static Metric qpsMetric(String name, Unit unit) {
- return getContext().qpsMetric(name, unit);
- }
-
- public static Metric threadSafeMetric(String name, Unit unit) {
- return getContext().threadSafeMetric(name, unit);
- }
-
- private static org.apache.seatunnel.engine.server.metrics.MetricsContext
getContext() {
- Container container = CONTEXT.get();
- org.apache.seatunnel.engine.server.metrics.MetricsContext context =
container.getContext();
- if (context == null) {
- throw new RuntimeException("Thread %s has no metrics context set,
this method can " +
- "be called only on threads executing the job's
processors");
- }
- return context;
- }
-
- public static class Container {
-
- private org.apache.seatunnel.engine.server.metrics.MetricsContext
context;
-
- Container() {
- }
-
- public org.apache.seatunnel.engine.server.metrics.MetricsContext
getContext() {
- return context;
- }
-
- public void setContext(MetricsContext context) {
- this.context = context;
- }
- }
-
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index d1269266a..09cd5bc13 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -23,7 +23,7 @@ import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
import org.apache.seatunnel.api.common.metrics.Unit;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.metrics.Metrics;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import java.io.IOException;
@@ -35,17 +35,20 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
- public SeaTunnelSourceCollector(Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs) {
+ private final MetricsContext metricsContext;
+
+ public SeaTunnelSourceCollector(Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs, MetricsContext metricsContext) {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
+ this.metricsContext = metricsContext;
}
@Override
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
- Metrics.qpsMetric(SOURCE_RECEIVED_QPS, Unit.COUNT).increment();
- Metrics.metric(SOURCE_RECEIVED_COUNT, Unit.COUNT).increment();
+ metricsContext.threadSafeQpsMetric(SOURCE_RECEIVED_QPS,
Unit.COUNT).increment();
+ metricsContext.threadSafeMetric(SOURCE_RECEIVED_COUNT,
Unit.COUNT).increment();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index a735307a9..b8c944e59 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -118,11 +118,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public void init() throws Exception {
super.init();
+ metricsContext = new MetricsContext();
this.currState = SeaTunnelTaskState.INIT;
flowFutures = new ArrayList<>();
allCycles = new ArrayList<>();
startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
- metricsContext = new MetricsContext();
for (FlowLifeCycle cycle : allCycles) {
cycle.init();
}
@@ -206,7 +206,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
} else if (f.getAction() instanceof SinkAction) {
lifeCycle = new SinkFlowLifeCycle<>((SinkAction)
f.getAction(), taskLocation, indexID, this,
((SinkConfig) f.getConfig()).getCommitterTask(),
- ((SinkConfig) f.getConfig()).isContainCommitter(),
completableFuture);
+ ((SinkConfig) f.getConfig()).isContainCommitter(),
+ completableFuture, this.getMetricsContext());
} else if (f.getAction() instanceof TransformChainAction) {
lifeCycle =
new
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index b8a00c062..a0901070d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -52,7 +52,7 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
throw new TaskRuntimeException("SourceSeaTunnelTask only support
SourceFlowLifeCycle, but get " + startFlowLifeCycle.getClass().getName());
} else {
- this.collector = new SeaTunnelSourceCollector<>(checkpointLock,
outputs);
+ this.collector = new SeaTunnelSourceCollector<>(checkpointLock,
outputs, this.getMetricsContext());
((SourceFlowLifeCycle<T, SplitT>)
startFlowLifeCycle).setCollector(collector);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7458cd641..da6e8d0c8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -32,7 +32,7 @@ import
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.Metrics;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -73,17 +73,21 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private Optional<CommitInfoT> lastCommitInfo;
+ private MetricsContext metricsContext;
+
private final boolean containAggCommitter;
public SinkFlowLifeCycle(SinkAction<T, StateT, CommitInfoT,
AggregatedCommitInfoT> sinkAction, TaskLocation taskLocation, int indexID,
SeaTunnelTask runningTask, TaskLocation
committerTaskLocation,
- boolean containAggCommitter,
CompletableFuture<Void> completableFuture) {
+ boolean containAggCommitter,
CompletableFuture<Void> completableFuture,
+ MetricsContext metricsContext) {
super(sinkAction, runningTask, completableFuture);
this.sinkAction = sinkAction;
this.indexID = indexID;
this.taskLocation = taskLocation;
this.committerTaskLocation = committerTaskLocation;
this.containAggCommitter = containAggCommitter;
+ this.metricsContext = metricsContext;
}
@Override
@@ -155,8 +159,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
return;
}
writer.write((T) record.getData());
- Metrics.qpsMetric(SINK_WRITE_QPS, Unit.COUNT).increment();
- Metrics.metric(SINK_WRITE_COUNT, Unit.COUNT).increment();
+ metricsContext.threadSafeQpsMetric(SINK_WRITE_QPS,
Unit.COUNT).increment();
+ metricsContext.threadSafeMetric(SINK_WRITE_COUNT,
Unit.COUNT).increment();
}
} catch (Exception e) {
throw new RuntimeException(e);