This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 976a8ca16 [Hotfix][ST-Engine] Fix user-defined threads cannot use 
engine metrics (#3660)
976a8ca16 is described below

commit 976a8ca16a14590a7618b622127020cea101c938
Author: ic4y <[email protected]>
AuthorDate: Thu Dec 8 17:31:54 2022 +0800

    [Hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics 
(#3660)
    
    * [hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics
    
    * [hotfix][ST-Engine] Fix user-defined threads cannot use engine metrics
---
 .../engine/server/TaskExecutionService.java        |  8 ---
 .../seatunnel/engine/server/metrics/Metrics.java   | 55 -----------------
 .../engine/server/metrics/MetricsContext.java      | 69 ++++++++++++++++-----
 .../engine/server/metrics/MetricsImpl.java         | 72 ----------------------
 .../server/task/SeaTunnelSourceCollector.java      | 11 ++--
 .../engine/server/task/SeaTunnelTask.java          |  5 +-
 .../engine/server/task/SourceSeaTunnelTask.java    |  2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java | 12 ++--
 8 files changed, 74 insertions(+), 160 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index ad6a592e4..37b3e93e9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -45,7 +45,6 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.metrics.MetricsImpl;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 
@@ -340,11 +339,9 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
             ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
             Thread.currentThread().setContextClassLoader(classLoader);
             final Task t = tracker.task;
-            MetricsImpl.Container userMetricsContextContainer = 
MetricsImpl.container();
             try {
                 startedLatch.countDown();
                 t.init();
-                userMetricsContextContainer.setContext(t.getMetricsContext());
                 ProgressState result;
                 do {
                     result = t.call();
@@ -355,7 +352,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 taskGroupExecutionTracker.exception(e);
             } finally {
                 taskGroupExecutionTracker.taskDone();
-                userMetricsContextContainer.setContext(null);
             }
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
@@ -381,7 +377,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         public AtomicReference<TaskTracker> exclusiveTaskTracker = new 
AtomicReference<>();
         final TaskCallTimer timer;
         private Thread myThread;
-        private MetricsImpl.Container userMetricsContextContainer;
         public LinkedBlockingDeque<TaskTracker> taskqueue;
 
         @SuppressWarnings("checkstyle:MagicNumber")
@@ -396,7 +391,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         @Override
         public void run() {
             myThread = currentThread();
-            userMetricsContextContainer = MetricsImpl.container();
             while (keep.get() && isRunning) {
                 TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
                     exclusiveTaskTracker.get() :
@@ -420,7 +414,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 try {
                     //run task
                     
myThread.setContextClassLoader(executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader());
-                    
userMetricsContextContainer.setContext(taskTracker.task.getMetricsContext());
                     call = taskTracker.task.call();
                     synchronized (timer) {
                         timer.timerStop();
@@ -437,7 +430,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 } finally {
                     //stop timer
                     timer.timerStop();
-                    userMetricsContextContainer.setContext(null);
                 }
                 //task call finished
                 if (null != call) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
deleted file mode 100644
index 72a8b2e67..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-
-public final class Metrics {
-
-    private Metrics() {
-    }
-
-    public static Metric metric(String name) {
-        return MetricsImpl.metric(name, Unit.COUNT);
-    }
-
-    /**
-     * Same as {@link #metric(String)}, but allows us to also specify the
-     * measurement {@link Unit} of the metric.
-     */
-    public static Metric metric(String name, Unit unit) {
-        return MetricsImpl.metric(name, unit);
-    }
-
-    public static Metric qpsMetric(String name, Unit unit) {
-        return MetricsImpl.qpsMetric(name, unit);
-    }
-
-    public static Metric threadSafeMetric(String name) {
-        return MetricsImpl.threadSafeMetric(name, Unit.COUNT);
-    }
-
-    /**
-     * Same as {@link #threadSafeMetric(String)}, but allows us to also
-     * specify the measurement {@link Unit} of the metric.
-     */
-    public static Metric threadSafeMetric(String name, Unit unit) {
-        return MetricsImpl.threadSafeMetric(name, unit);
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
index f54e02f79..be8480229 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -36,23 +36,27 @@ public class MetricsContext implements 
DynamicMetricsProvider {
 
     private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
     private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
-
     private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_QPS_METRIC = SingleWriterQPSMetric::new;
+    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_QPS_METRIC = ThreadSafeQPSMetric::new;
 
     private volatile Map<String, AbstractMetric> metrics;
 
-    Metric metric(String name, Unit unit) {
+    public Metric metric(String name, Unit unit) {
         return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
     }
 
-    Metric qpsMetric(String name, Unit unit) {
+    public Metric qpsMetric(String name, Unit unit) {
         return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
     }
 
-    Metric threadSafeMetric(String name, Unit unit) {
+    public Metric threadSafeMetric(String name, Unit unit) {
         return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
     }
 
+    public Metric threadSafeQpsMetric(String name, Unit unit) {
+        return metric(name, unit, CREATE_THREAD_SAFE_QPS_METRIC);
+    }
+
     private Metric metric(String name, Unit unit, BiFunction<String, Unit, 
AbstractMetric> metricSupplier) {
         if (metrics == null) { //first metric being stored
             metrics = new ConcurrentHashMap<>();
@@ -120,39 +124,35 @@ public class MetricsContext implements 
DynamicMetricsProvider {
             AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class, 
"value");
 
         private volatile long value;
-        private volatile long timestamp;
+        private final long timestamp;
 
         SingleWriterQPSMetric(String name, Unit unit) {
             super(name, unit);
+            timestamp = System.currentTimeMillis();
         }
 
         @Override
         public void set(long newValue) {
-            checkAndSetStartTime();
             VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
         }
 
         @Override
         public void increment() {
-            checkAndSetStartTime();
             VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
         }
 
         @Override
         public void increment(long increment) {
-            checkAndSetStartTime();
             VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
         }
 
         @Override
         public void decrement() {
-            checkAndSetStartTime();
             VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
         }
 
         @Override
         public void decrement(long decrement) {
-            checkAndSetStartTime();
             VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
         }
 
@@ -162,11 +162,52 @@ public class MetricsContext implements 
DynamicMetricsProvider {
             long cost = System.currentTimeMillis() - timestamp;
             return (double) value * 1000 / cost;
         }
+    }
+
+    private static final class ThreadSafeQPSMetric extends AbstractMetric {
+
+        private static final AtomicLongFieldUpdater<ThreadSafeQPSMetric> 
VOLATILE_VALUE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMetric.class, 
"value");
+
+        private volatile long value;
+
+        private final long timestamp;
+
+        ThreadSafeQPSMetric(String name, Unit unit) {
+            super(name, unit);
+            timestamp = System.currentTimeMillis();
+        }
+
+        @Override
+        public void increment() {
+            VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+        }
+
+        @Override
+        public void increment(long amount) {
+            VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
+        }
+
+        @Override
+        public void decrement() {
+            VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+        }
+
+        @Override
+        public void decrement(long amount) {
+            VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
+        }
 
-        private void checkAndSetStartTime(){
-            if (timestamp == 0){
-                timestamp = System.currentTimeMillis();
-            }
+        @Override
+        public void set(long newValue) {
+            VOLATILE_VALUE_UPDATER.set(this, newValue);
+        }
+
+        @SuppressWarnings("checkstyle:MagicNumber")
+        @Override
+        protected Object get() {
+            long cost = System.currentTimeMillis() - timestamp;
+            return (double) value * 1000 / cost;
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
deleted file mode 100644
index 6eea089a2..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-
-public final class MetricsImpl {
-
-    private static final ThreadLocal<Container> CONTEXT = 
ThreadLocal.withInitial(Container::new);
-
-    private MetricsImpl() {
-    }
-
-    public static Container container() {
-        return CONTEXT.get();
-    }
-
-    public static Metric metric(String name, Unit unit) {
-        return getContext().metric(name, unit);
-    }
-
-    public static Metric qpsMetric(String name, Unit unit) {
-        return getContext().qpsMetric(name, unit);
-    }
-
-    public static Metric threadSafeMetric(String name, Unit unit) {
-        return getContext().threadSafeMetric(name, unit);
-    }
-
-    private static org.apache.seatunnel.engine.server.metrics.MetricsContext 
getContext() {
-        Container container = CONTEXT.get();
-        org.apache.seatunnel.engine.server.metrics.MetricsContext context = 
container.getContext();
-        if (context == null) {
-            throw new RuntimeException("Thread %s has no metrics context set, 
this method can " +
-                    "be called only on threads executing the job's 
processors");
-        }
-        return context;
-    }
-
-    public static class Container {
-
-        private org.apache.seatunnel.engine.server.metrics.MetricsContext 
context;
-
-        Container() {
-        }
-
-        public org.apache.seatunnel.engine.server.metrics.MetricsContext 
getContext() {
-            return context;
-        }
-
-        public void setContext(MetricsContext context) {
-            this.context = context;
-        }
-    }
-
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index d1269266a..09cd5bc13 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -23,7 +23,7 @@ import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
 import org.apache.seatunnel.api.common.metrics.Unit;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.metrics.Metrics;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import java.io.IOException;
@@ -35,17 +35,20 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
 
-    public SeaTunnelSourceCollector(Object checkpointLock, 
List<OneInputFlowLifeCycle<Record<?>>> outputs) {
+    private final MetricsContext metricsContext;
+
+    public SeaTunnelSourceCollector(Object checkpointLock, 
List<OneInputFlowLifeCycle<Record<?>>> outputs, MetricsContext metricsContext) {
         this.checkpointLock = checkpointLock;
         this.outputs = outputs;
+        this.metricsContext = metricsContext;
     }
 
     @Override
     public void collect(T row) {
         try {
             sendRecordToNext(new Record<>(row));
-            Metrics.qpsMetric(SOURCE_RECEIVED_QPS, Unit.COUNT).increment();
-            Metrics.metric(SOURCE_RECEIVED_COUNT, Unit.COUNT).increment();
+            metricsContext.threadSafeQpsMetric(SOURCE_RECEIVED_QPS, 
Unit.COUNT).increment();
+            metricsContext.threadSafeMetric(SOURCE_RECEIVED_COUNT, 
Unit.COUNT).increment();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index a735307a9..b8c944e59 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -118,11 +118,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
     @Override
     public void init() throws Exception {
         super.init();
+        metricsContext = new MetricsContext();
         this.currState = SeaTunnelTaskState.INIT;
         flowFutures = new ArrayList<>();
         allCycles = new ArrayList<>();
         startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
-        metricsContext = new MetricsContext();
         for (FlowLifeCycle cycle : allCycles) {
             cycle.init();
         }
@@ -206,7 +206,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
             } else if (f.getAction() instanceof SinkAction) {
                 lifeCycle = new SinkFlowLifeCycle<>((SinkAction) 
f.getAction(), taskLocation, indexID, this,
                     ((SinkConfig) f.getConfig()).getCommitterTask(),
-                    ((SinkConfig) f.getConfig()).isContainCommitter(), 
completableFuture);
+                    ((SinkConfig) f.getConfig()).isContainCommitter(),
+                    completableFuture, this.getMetricsContext());
             } else if (f.getAction() instanceof TransformChainAction) {
                 lifeCycle =
                     new 
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index b8a00c062..a0901070d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -52,7 +52,7 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
         if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
             throw new TaskRuntimeException("SourceSeaTunnelTask only support 
SourceFlowLifeCycle, but get " + startFlowLifeCycle.getClass().getName());
         } else {
-            this.collector = new SeaTunnelSourceCollector<>(checkpointLock, 
outputs);
+            this.collector = new SeaTunnelSourceCollector<>(checkpointLock, 
outputs, this.getMetricsContext());
             ((SourceFlowLifeCycle<T, SplitT>) 
startFlowLifeCycle).setCollector(collector);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7458cd641..da6e8d0c8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -32,7 +32,7 @@ import 
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.Metrics;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -73,17 +73,21 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private Optional<CommitInfoT> lastCommitInfo;
 
+    private MetricsContext metricsContext;
+
     private final boolean containAggCommitter;
 
     public SinkFlowLifeCycle(SinkAction<T, StateT, CommitInfoT, 
AggregatedCommitInfoT> sinkAction, TaskLocation taskLocation, int indexID,
                              SeaTunnelTask runningTask, TaskLocation 
committerTaskLocation,
-                             boolean containAggCommitter, 
CompletableFuture<Void> completableFuture) {
+                             boolean containAggCommitter, 
CompletableFuture<Void> completableFuture,
+                             MetricsContext metricsContext) {
         super(sinkAction, runningTask, completableFuture);
         this.sinkAction = sinkAction;
         this.indexID = indexID;
         this.taskLocation = taskLocation;
         this.committerTaskLocation = committerTaskLocation;
         this.containAggCommitter = containAggCommitter;
+        this.metricsContext = metricsContext;
     }
 
     @Override
@@ -155,8 +159,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     return;
                 }
                 writer.write((T) record.getData());
-                Metrics.qpsMetric(SINK_WRITE_QPS, Unit.COUNT).increment();
-                Metrics.metric(SINK_WRITE_COUNT, Unit.COUNT).increment();
+                metricsContext.threadSafeQpsMetric(SINK_WRITE_QPS, 
Unit.COUNT).increment();
+                metricsContext.threadSafeMetric(SINK_WRITE_COUNT, 
Unit.COUNT).increment();
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Reply via email to