This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 32e1f91c7 [Feature][API] Add Metrics for Connector-V2 (#4017)
32e1f91c7 is described below

commit 32e1f91c7a315e1d2718bf2380e841bcbeef4ba4
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 15 22:49:55 2023 +0800

    [Feature][API] Add Metrics for Connector-V2 (#4017)
    
    * [Feature][API] add Metrics for Connector V2
    
    * [Feature][API] add Metrics for Connector V2
    
    * fix merge conflict
    
    * fix merge conflict
    
    * fix merge conflict
    
    * fix Elasticsearch test container start timeOut
    
    * fix checkstyle
---
 .../api/common/metrics/AbstractMetricsContext.java |  82 ++++++
 .../common/metrics/{Metric.java => Counter.java}   |  43 +--
 .../apache/seatunnel/api/common/metrics/Meter.java |  36 ++-
 .../seatunnel/api/common/metrics/Metric.java       |  19 +-
 .../api/common/metrics/MetricsContext.java         |  57 ++++
 .../api/common/metrics/ThreadSafeCounter.java      |  81 ++++++
 .../api/common/metrics/ThreadSafeQPSMeter.java     |  85 ++++++
 .../api/sink/DefaultSinkWriterContext.java         |  10 +
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |   5 +
 .../apache/seatunnel/api/source/SourceReader.java  |   4 +
 .../api/source/SourceSplitEnumerator.java          |   4 +
 .../cdc/base/source/IncrementalSource.java         |   8 +-
 .../reader/IncrementalSourceRecordEmitter.java     |  35 ++-
 .../engine/server/TaskExecutionService.java        |   4 +-
 .../seatunnel/engine/server/execution/Task.java    |   2 +-
 .../server/execution/TaskExecutionContext.java     |   8 +-
 .../seatunnel/engine/server/master/JobMaster.java  |   4 +-
 .../engine/server/metrics/MetricsContext.java      | 314 ---------------------
 .../server/metrics/SeaTunnelMetricsContext.java    |  67 +++++
 .../engine/server/task/CoordinatorTask.java        |  30 ++
 .../server/task/SeaTunnelSourceCollector.java      |  16 +-
 .../engine/server/task/SeaTunnelTask.java          |  13 +-
 .../engine/server/task/SourceSeaTunnelTask.java    |   7 +-
 .../server/task/SourceSplitEnumeratorTask.java     |   3 +-
 .../engine/server/task/TransformSeaTunnelTask.java |   4 +-
 .../context/SeaTunnelSplitEnumeratorContext.java   |  13 +-
 .../server/task/context/SinkWriterContext.java     |  10 +-
 .../server/task/context/SourceReaderContext.java   |  14 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  24 +-
 .../server/task/flow/SourceFlowLifeCycle.java      |  12 +-
 .../source/CoordinatedEnumeratorContext.java       |   9 +
 .../source/CoordinatedReaderContext.java           |   9 +
 .../source/ParallelEnumeratorContext.java          |   9 +
 .../translation/source/ParallelReaderContext.java  |   9 +
 34 files changed, 648 insertions(+), 402 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
new file mode 100644
index 000000000..a8ce35485
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public abstract class AbstractMetricsContext implements MetricsContext, 
Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+    @Override
+    public Counter counter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Counter) metrics.get(name);
+        }
+        return this.counter(name, new ThreadSafeCounter(name));
+    }
+
+    @Override
+    public <C extends Counter> C counter(String name, C counter) {
+        this.addMetric(name, counter);
+        return counter;
+    }
+
+    @Override
+    public Meter meter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Meter) metrics.get(name);
+        }
+        return this.meter(name, new ThreadSafeQPSMeter(name));
+    }
+
+    @Override
+    public <M extends Meter> M meter(String name, M meter) {
+        this.addMetric(name, meter);
+        return meter;
+    }
+
+    protected void addMetric(String name, Metric metric) {
+        if (metric == null) {
+            log.warn("Ignoring attempted add of a metric due to being null for 
name {}.", name);
+        } else {
+            synchronized (this) {
+                Metric prior = this.metrics.put(name, metric);
+                if (prior != null) {
+                    this.metrics.put(name, prior);
+                    log.warn(
+                            "Name collision: MetricsContext already contains a 
Metric with the name '"
+                                    + name
+                                    + "'. Metric will not be reported.");
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AbstractMetricsContext{" + "metrics=" + metrics + '}';
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
similarity index 52%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
index 97127fda4..0bd657b02 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
@@ -17,31 +17,36 @@
 
 package org.apache.seatunnel.api.common.metrics;
 
-public interface Metric {
+/** A Counter is a {@link Metric} that measures a count. */
+public interface Counter extends Metric {
 
-    /** Returns the name of the associated metric. */
-    String name();
+    /** Increment the current count by 1. */
+    void inc();
 
     /**
-     * Return the measurement unit for the associated metric. Meant to provide 
further information
-     * on the type of value measured by the user-defined metric. Doesn't 
affect the functionality of
-     * the metric, it still remains a simple numeric value, but is used to 
populate the {@link
-     * MetricTags#UNIT} tag in the metric's description.
+     * Increment the current count by the given value.
+     *
+     * @param n value to increment the current count by
      */
-    Unit unit();
+    void inc(long n);
 
-    /** Increments the current value by 1. */
-    void increment();
+    /** Decrement the current count by 1. */
+    void dec();
 
-    /** Increments the current value by the specified amount. */
-    void increment(long amount);
-
-    /** Decrements the current value by 1. */
-    void decrement();
-
-    /** Decrements the current value by the specified amount. */
-    void decrement(long amount);
+    /**
+     * Decrement the current count by the given value.
+     *
+     * @param n value to decrement the current count by
+     */
+    void dec(long n);
 
     /** Sets the current value. */
-    void set(long newValue);
+    void set(long n);
+
+    /**
+     * Returns the current count.
+     *
+     * @return current count
+     */
+    long getCount();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
similarity index 56%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
index b23090bd9..69b4fdf50 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
@@ -15,21 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.context;
+package org.apache.seatunnel.api.common.metrics;
 
-import org.apache.seatunnel.api.sink.SinkWriter;
+/** Metric for measuring throughput. */
+public interface Meter extends Metric {
+    /** Mark occurrence of an event. */
+    void markEvent();
 
-public class SinkWriterContext implements SinkWriter.Context {
+    /**
+     * Mark occurrence of multiple events.
+     *
+     * @param n number of events occurred
+     */
+    void markEvent(long n);
 
-    private static final long serialVersionUID = -3082515319043725121L;
-    private final int indexID;
+    /**
+     * Returns the current rate of events per second.
+     *
+     * @return current rate of events per second
+     */
+    double getRate();
 
-    public SinkWriterContext(int indexID) {
-        this.indexID = indexID;
-    }
-
-    @Override
-    public int getIndexOfSubtask() {
-        return indexID;
-    }
+    /**
+     * Get number of events marked on the meter.
+     *
+     * @return number of events marked on the meter
+     */
+    long getCount();
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
index 97127fda4..55b1cdee4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.api.common.metrics;
 
-public interface Metric {
+import java.io.Serializable;
+
+public interface Metric extends Serializable {
 
     /** Returns the name of the associated metric. */
     String name();
@@ -29,19 +31,4 @@ public interface Metric {
      * MetricTags#UNIT} tag in the metric's description.
      */
     Unit unit();
-
-    /** Increments the current value by 1. */
-    void increment();
-
-    /** Increments the current value by the specified amount. */
-    void increment(long amount);
-
-    /** Decrements the current value by 1. */
-    void decrement();
-
-    /** Decrements the current value by the specified amount. */
-    void decrement(long amount);
-
-    /** Sets the current value. */
-    void set(long newValue);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
new file mode 100644
index 000000000..a8875d91d
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public interface MetricsContext {
+
+    /**
+     * registers a {@link ThreadSafeCounter} with SeaTunnel.
+     *
+     * @param name name of the counter
+     * @return the created counter
+     */
+    Counter counter(String name);
+
+    /**
+     * Registers a {@link Counter} with SeaTunnel.
+     *
+     * @param name name of the counter
+     * @param counter counter to register
+     * @param <C> counter type
+     * @return the given counter
+     */
+    <C extends Counter> C counter(String name, C counter);
+
+    /**
+     * Registers a {@link ThreadSafeQPSMeter} with SeaTunnel.
+     *
+     * @param name name of the meter
+     * @return the registered meter
+     */
+    Meter meter(String name);
+
+    /**
+     * Registers a new {@link Meter} with SeaTunnel.
+     *
+     * @param name name of the meter
+     * @param meter meter to register
+     * @param <M> meter type
+     * @return the registered meter
+     */
+    <M extends Meter> M meter(String name, M meter);
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
new file mode 100644
index 000000000..7094bc429
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class ThreadSafeCounter implements Counter, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+    private static final AtomicLongFieldUpdater<ThreadSafeCounter> 
VOLATILE_VALUE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ThreadSafeCounter.class, 
"value");
+
+    private volatile long value;
+
+    public ThreadSafeCounter(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void inc() {
+        VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+    }
+
+    @Override
+    public void inc(long n) {
+        VOLATILE_VALUE_UPDATER.addAndGet(this, n);
+    }
+
+    @Override
+    public void dec() {
+        VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+    }
+
+    @Override
+    public void dec(long n) {
+        VOLATILE_VALUE_UPDATER.addAndGet(this, -n);
+    }
+
+    @Override
+    public void set(long n) {
+        VOLATILE_VALUE_UPDATER.set(this, n);
+    }
+
+    @Override
+    public long getCount() {
+        return VOLATILE_VALUE_UPDATER.get(this);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Unit unit() {
+        return Unit.COUNT;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreadSafeCounter{" + "name='" + name + '\'' + ", value=" + 
value + '}';
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
new file mode 100644
index 000000000..627e9bd4c
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class ThreadSafeQPSMeter implements Meter, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final AtomicLongFieldUpdater<ThreadSafeQPSMeter> 
VOLATILE_VALUE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMeter.class, 
"value");
+
+    private final String name;
+
+    private volatile long value;
+
+    private final long timestamp;
+
+    public ThreadSafeQPSMeter(String name) {
+        this.name = name;
+        timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    public void markEvent() {
+        VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+    }
+
+    @Override
+    public void markEvent(long n) {
+        VOLATILE_VALUE_UPDATER.addAndGet(this, n);
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public double getRate() {
+        long cost = System.currentTimeMillis() - timestamp;
+        return (double) value * 1000 / cost;
+    }
+
+    @Override
+    public long getCount() {
+        return VOLATILE_VALUE_UPDATER.get(this);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Unit unit() {
+        return Unit.COUNT;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreadSafeQPSMeter{"
+                + "name='"
+                + name
+                + '\''
+                + ", value="
+                + value
+                + ", timestamp="
+                + timestamp
+                + '}';
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 05fd64034..43a92f35f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
 /** The default {@link SinkWriter.Context} implement class. */
 public class DefaultSinkWriterContext implements SinkWriter.Context {
     private final int subtask;
@@ -29,4 +32,11 @@ public class DefaultSinkWriterContext implements 
SinkWriter.Context {
     public int getIndexOfSubtask() {
         return subtask;
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // TODO Waiting for Flink and Spark to implement MetricsContext
+        // https://github.com/apache/incubator-seatunnel/issues/3431
+        return new AbstractMetricsContext() {};
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index e6996ec83..c0fbe2c02 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -79,5 +81,8 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
 
         /** @return The index of this subtask. */
         int getIndexOfSubtask();
+
+        /** @return metricsContext of this reader. */
+        MetricsContext getMetricsContext();
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index bb98308ff..50ec0c137 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.source;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.state.CheckpointListener;
 
 import java.io.IOException;
@@ -107,5 +108,8 @@ public interface SourceReader<T, SplitT extends SourceSplit>
          * @param sourceEvent the source event to coordinator.
          */
         void sendSourceEventToEnumerator(SourceEvent sourceEvent);
+
+        /** @return metricsContext of this reader. */
+        MetricsContext getMetricsContext();
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 7acf6d677..49d3c7c1f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.source;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.state.CheckpointListener;
 
 import java.io.IOException;
@@ -116,5 +117,8 @@ public interface SourceSplitEnumerator<SplitT extends 
SourceSplit, StateT>
          * @param event the source event to send.
          */
         void sendEventToSourceReader(int subtaskId, SourceEvent event);
+
+        /** @return metricsContext of this reader. */
+        MetricsContext getMetricsContext();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 5c56a73fe..4cd707da4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.cdc.base.source;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -142,15 +143,16 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
         return new IncrementalSourceReader<>(
                 elementsQueue,
                 splitReaderSupplier,
-                createRecordEmitter(sourceConfig),
+                createRecordEmitter(sourceConfig, 
readerContext.getMetricsContext()),
                 new SourceReaderOptions(readonlyConfig),
                 readerContext,
                 sourceConfig);
     }
 
     protected RecordEmitter<SourceRecords, T, SourceSplitStateBase> 
createRecordEmitter(
-            SourceConfig sourceConfig) {
-        return new IncrementalSourceRecordEmitter<>(deserializationSchema, 
offsetFactory);
+            SourceConfig sourceConfig, MetricsContext metricsContext) {
+        return new IncrementalSourceRecordEmitter<>(
+                deserializationSchema, offsetFactory, metricsContext);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index a0969ac02..49eb17ede 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.reader;
 
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
@@ -35,6 +37,8 @@ import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isHighWatermarkEvent;
 import static 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
+import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp;
+import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 
@@ -48,17 +52,26 @@ import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.i
 public class IncrementalSourceRecordEmitter<T>
         implements RecordEmitter<SourceRecords, T, SourceSplitStateBase> {
 
+    private static final String CDC_RECORD_FETCH_DELAY = "CDCRecordFetchDelay";
+    private static final String CDC_RECORD_EMIT_DELAY = "CDCRecordEmitDelay";
+
     protected final DebeziumDeserializationSchema<T> 
debeziumDeserializationSchema;
     protected final OutputCollector<T> outputCollector;
 
     protected final OffsetFactory offsetFactory;
 
+    protected final Counter recordFetchDelay;
+    protected final Counter recordEmitDelay;
+
     public IncrementalSourceRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
-            OffsetFactory offsetFactory) {
+            OffsetFactory offsetFactory,
+            MetricsContext metricsContext) {
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.outputCollector = new OutputCollector<>();
         this.offsetFactory = offsetFactory;
+        this.recordFetchDelay = metricsContext.counter(CDC_RECORD_FETCH_DELAY);
+        this.recordEmitDelay = metricsContext.counter(CDC_RECORD_EMIT_DELAY);
     }
 
     @Override
@@ -67,7 +80,25 @@ public class IncrementalSourceRecordEmitter<T>
             throws Exception {
         final Iterator<SourceRecord> elementIterator = 
sourceRecords.iterator();
         while (elementIterator.hasNext()) {
-            processElement(elementIterator.next(), collector, splitState);
+            SourceRecord next = elementIterator.next();
+            reportMetrics(next);
+            processElement(next, collector, splitState);
+        }
+    }
+
+    protected void reportMetrics(SourceRecord element) {
+        long now = System.currentTimeMillis();
+        // record the latest process time
+        Long messageTimestamp = getMessageTimestamp(element);
+
+        if (messageTimestamp != null && messageTimestamp > 0L) {
+            // report fetch delay
+            Long fetchTimestamp = getFetchTimestamp(element);
+            if (fetchTimestamp != null) {
+                recordFetchDelay.set(fetchTimestamp - messageTimestamp);
+            }
+            // report emit delay
+            recordEmitDelay.set(now - messageTimestamp);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 089b27aeb..c221a78e8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -36,7 +36,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -432,7 +432,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         contextMap.putAll(executionContexts);
         contextMap.putAll(finishedExecutionContexts);
         try {
-            IMap<TaskLocation, MetricsContext> map =
+            IMap<TaskLocation, SeaTunnelMetricsContext> map =
                     
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
             contextMap.forEach(
                     (taskGroupLocation, taskGroupContext) -> {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 820ba6abc..58afa695d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.Stateful;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.internal.metrics.DynamicMetricsProvider;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 7d51803dc..bf4434544 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.execution;
 
 import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
@@ -50,10 +50,10 @@ public class TaskExecutionContext {
         return nodeEngine.getLogger(task.getClass());
     }
 
-    public MetricsContext getOrCreateMetricsContext(TaskLocation taskLocation) 
{
-        IMap<TaskLocation, MetricsContext> map =
+    public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation 
taskLocation) {
+        IMap<TaskLocation, SeaTunnelMetricsContext> map =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-        return map.computeIfAbsent(taskLocation, k -> new MetricsContext());
+        return map.computeIfAbsent(taskLocation, k -> new 
SeaTunnelMetricsContext());
     }
 
     public <T> T getTask() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 12a54359a..21e11d992 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -50,7 +50,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -488,7 +488,7 @@ public class JobMaster {
             PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
         if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd()
                 || pipelineStatus.equals(PipelineStatus.CANCELED)) {
-            IMap<TaskLocation, MetricsContext> map =
+            IMap<TaskLocation, SeaTunnelMetricsContext> map =
                     
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
             map.keySet().stream()
                     .filter(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
deleted file mode 100644
index 40023968d..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
-
-import com.hazelcast.internal.metrics.DynamicMetricsProvider;
-import com.hazelcast.internal.metrics.MetricDescriptor;
-import com.hazelcast.internal.metrics.MetricsCollectionContext;
-import com.hazelcast.internal.metrics.ProbeLevel;
-import com.hazelcast.internal.metrics.ProbeUnit;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.function.BiFunction;
-
-public class MetricsContext implements DynamicMetricsProvider, Serializable {
-
-    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_METRIC =
-            SingleWriterMetric::new;
-    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_METRICS =
-            ThreadSafeMetric::new;
-    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_QPS_METRIC =
-            SingleWriterQPSMetric::new;
-    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_QPS_METRIC =
-            ThreadSafeQPSMetric::new;
-
-    private volatile Map<String, AbstractMetric> metrics;
-
-    public Metric metric(String name, Unit unit) {
-        return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
-    }
-
-    public Metric qpsMetric(String name, Unit unit) {
-        return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
-    }
-
-    public Metric threadSafeMetric(String name, Unit unit) {
-        return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
-    }
-
-    public Metric threadSafeQpsMetric(String name, Unit unit) {
-        return metric(name, unit, CREATE_THREAD_SAFE_QPS_METRIC);
-    }
-
-    private Metric metric(
-            String name, Unit unit, BiFunction<String, Unit, AbstractMetric> 
metricSupplier) {
-        if (metrics == null) { // first metric being stored
-            metrics = new ConcurrentHashMap<>();
-        }
-
-        AbstractMetric metric = metrics.get(name);
-        if (metric != null) {
-            return metric;
-        }
-
-        metric = metricSupplier.apply(name, unit);
-        metrics.put(name, metric);
-
-        return metric;
-    }
-
-    @Override
-    public void provideDynamicMetrics(MetricDescriptor tagger, 
MetricsCollectionContext context) {
-        if (metrics != null) {
-            metrics.forEach(
-                    (name, metric) -> {
-                        if (metric.get() instanceof Long) {
-                            context.collect(
-                                    tagger.copy(),
-                                    name,
-                                    ProbeLevel.INFO,
-                                    toProbeUnit(metric.unit()),
-                                    (Long) metric.get());
-                        } else if (metric.get() instanceof Double) {
-                            context.collect(
-                                    tagger.copy(),
-                                    name,
-                                    ProbeLevel.INFO,
-                                    toProbeUnit(metric.unit()),
-                                    (Double) metric.get());
-                        } else {
-                            throw new SeaTunnelException(
-                                    "The value of Metric does not support "
-                                            + 
metric.get().getClass().getSimpleName()
-                                            + " data type");
-                        }
-                    });
-        }
-    }
-
-    private ProbeUnit toProbeUnit(Unit unit) {
-        return ProbeUnit.valueOf(unit.name());
-    }
-
-    private abstract static class AbstractMetric implements Metric, 
Serializable {
-
-        private final String name;
-        private final Unit unit;
-
-        AbstractMetric(String name, Unit unit) {
-            this.name = name;
-            this.unit = unit;
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public Unit unit() {
-            return unit;
-        }
-
-        protected abstract Object get();
-    }
-
-    private static final class SingleWriterQPSMetric extends AbstractMetric {
-
-        private static final AtomicLongFieldUpdater<SingleWriterQPSMetric> 
VOLATILE_VALUE_UPDATER =
-                AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class, 
"value");
-
-        private volatile long value;
-        private final long timestamp;
-
-        SingleWriterQPSMetric(String name, Unit unit) {
-            super(name, unit);
-            timestamp = System.currentTimeMillis();
-        }
-
-        @Override
-        public void set(long newValue) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
-        }
-
-        @Override
-        public void increment() {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
-        }
-
-        @Override
-        public void increment(long increment) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
-        }
-
-        @Override
-        public void decrement() {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
-        }
-
-        @Override
-        public void decrement(long decrement) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
-        }
-
-        @SuppressWarnings("checkstyle:MagicNumber")
-        @Override
-        protected Object get() {
-            long cost = System.currentTimeMillis() - timestamp;
-            return (double) value * 1000 / cost;
-        }
-    }
-
-    private static final class ThreadSafeQPSMetric extends AbstractMetric {
-
-        private static final AtomicLongFieldUpdater<ThreadSafeQPSMetric> 
VOLATILE_VALUE_UPDATER =
-                AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMetric.class, 
"value");
-
-        private volatile long value;
-
-        private final long timestamp;
-
-        ThreadSafeQPSMetric(String name, Unit unit) {
-            super(name, unit);
-            timestamp = System.currentTimeMillis();
-        }
-
-        @Override
-        public void increment() {
-            VOLATILE_VALUE_UPDATER.incrementAndGet(this);
-        }
-
-        @Override
-        public void increment(long amount) {
-            VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
-        }
-
-        @Override
-        public void decrement() {
-            VOLATILE_VALUE_UPDATER.decrementAndGet(this);
-        }
-
-        @Override
-        public void decrement(long amount) {
-            VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
-        }
-
-        @Override
-        public void set(long newValue) {
-            VOLATILE_VALUE_UPDATER.set(this, newValue);
-        }
-
-        @SuppressWarnings("checkstyle:MagicNumber")
-        @Override
-        protected Object get() {
-            long cost = System.currentTimeMillis() - timestamp;
-            return (double) value * 1000 / cost;
-        }
-    }
-
-    private static final class SingleWriterMetric extends AbstractMetric {
-
-        private static final AtomicLongFieldUpdater<SingleWriterMetric> 
VOLATILE_VALUE_UPDATER =
-                AtomicLongFieldUpdater.newUpdater(SingleWriterMetric.class, 
"value");
-
-        private volatile long value;
-
-        SingleWriterMetric(String name, Unit unit) {
-            super(name, unit);
-        }
-
-        @Override
-        public void set(long newValue) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
-        }
-
-        @Override
-        public void increment() {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
-        }
-
-        @Override
-        public void increment(long increment) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
-        }
-
-        @Override
-        public void decrement() {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
-        }
-
-        @Override
-        public void decrement(long decrement) {
-            VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
-        }
-
-        @Override
-        protected Object get() {
-            return value;
-        }
-    }
-
-    private static final class ThreadSafeMetric extends AbstractMetric {
-
-        private static final AtomicLongFieldUpdater<ThreadSafeMetric> 
VOLATILE_VALUE_UPDATER =
-                AtomicLongFieldUpdater.newUpdater(ThreadSafeMetric.class, 
"value");
-
-        private volatile long value;
-
-        ThreadSafeMetric(String name, Unit unit) {
-            super(name, unit);
-        }
-
-        @Override
-        public void increment() {
-            VOLATILE_VALUE_UPDATER.incrementAndGet(this);
-        }
-
-        @Override
-        public void increment(long amount) {
-            VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
-        }
-
-        @Override
-        public void decrement() {
-            VOLATILE_VALUE_UPDATER.decrementAndGet(this);
-        }
-
-        @Override
-        public void decrement(long amount) {
-            VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
-        }
-
-        @Override
-        public void set(long newValue) {
-            VOLATILE_VALUE_UPDATER.set(this, newValue);
-        }
-
-        @Override
-        protected Object get() {
-            return VOLATILE_VALUE_UPDATER.get(this);
-        }
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
new file mode 100644
index 000000000..2ed763b99
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.ProbeLevel;
+import com.hazelcast.internal.metrics.ProbeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SeaTunnelMetricsContext extends AbstractMetricsContext
+        implements DynamicMetricsProvider {
+
+    @Override
+    public void provideDynamicMetrics(MetricDescriptor tagger, 
MetricsCollectionContext context) {
+        metrics.forEach(
+                (name, metric) -> {
+                    if (metric instanceof Counter) {
+                        context.collect(
+                                tagger.copy(),
+                                name,
+                                ProbeLevel.INFO,
+                                toProbeUnit(metric.unit()),
+                                ((Counter) metric).getCount());
+                    } else if (metric instanceof Meter) {
+                        context.collect(
+                                tagger.copy(),
+                                name,
+                                ProbeLevel.INFO,
+                                toProbeUnit(metric.unit()),
+                                ((Meter) metric).getRate());
+                    } else {
+                        throw new SeaTunnelException(
+                                "The value of Metric does not support "
+                                        + metric.getClass().getSimpleName()
+                                        + " data type");
+                    }
+                });
+    }
+
+    private ProbeUnit toProbeUnit(Unit unit) {
+        return ProbeUnit.valueOf(unit.name());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
index c4b019cd6..b56266de4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
@@ -17,13 +17,43 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import org.apache.seatunnel.api.common.metrics.MetricTags;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
 
 public abstract class CoordinatorTask extends AbstractTask {
 
     private static final long serialVersionUID = -3957168748281681077L;
 
+    private SeaTunnelMetricsContext metricsContext;
+
     public CoordinatorTask(long jobID, TaskLocation taskID) {
         super(jobID, taskID);
     }
+
+    @Override
+    public void init() throws Exception {
+        super.init();
+        metricsContext = 
getExecutionContext().getOrCreateMetricsContext(taskLocation);
+    }
+
+    @Override
+    public SeaTunnelMetricsContext getMetricsContext() {
+        return metricsContext;
+    }
+
+    @Override
+    public void provideDynamicMetrics(
+            MetricDescriptor descriptor, MetricsCollectionContext context) {
+        if (null != metricsContext) {
+            metricsContext.provideDynamicMetrics(
+                    descriptor
+                            .copy()
+                            .withTag(MetricTags.TASK_NAME, 
this.getClass().getSimpleName()),
+                    context);
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 03ff826d7..7ebcdf0d8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,10 +17,11 @@
 
 package org.apache.seatunnel.engine.server.task;
 
-import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import java.io.IOException;
@@ -35,7 +36,9 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
 
-    private final MetricsContext metricsContext;
+    private final Counter sourceReceivedCount;
+
+    private final Meter sourceReceivedQPS;
 
     public SeaTunnelSourceCollector(
             Object checkpointLock,
@@ -43,15 +46,16 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
             MetricsContext metricsContext) {
         this.checkpointLock = checkpointLock;
         this.outputs = outputs;
-        this.metricsContext = metricsContext;
+        sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
+        sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
     }
 
     @Override
     public void collect(T row) {
         try {
             sendRecordToNext(new Record<>(row));
-            metricsContext.threadSafeQpsMetric(SOURCE_RECEIVED_QPS, 
Unit.COUNT).increment();
-            metricsContext.threadSafeMetric(SOURCE_RECEIVED_COUNT, 
Unit.COUNT).increment();
+            sourceReceivedCount.inc();
+            sourceReceivedQPS.markEvent();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e8418c71d..942514055 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -40,7 +41,7 @@ import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
 import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
@@ -108,7 +109,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     private TaskGroup taskBelongGroup;
 
-    private MetricsContext metricsContext;
+    private SeaTunnelMetricsContext metricsContext;
 
     public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow 
executionFlow) {
         super(jobID, taskID);
@@ -208,7 +209,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
                         createSourceFlowLifeCycle(
                                 (SourceAction<?, ?, ?>) f.getAction(),
                                 (SourceConfig) f.getConfig(),
-                                completableFuture);
+                                completableFuture,
+                                this.getMetricsContext());
                 outputs = flowLifeCycles;
             } else if (f.getAction() instanceof SinkAction) {
                 lifeCycle =
@@ -258,7 +260,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
     protected abstract SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
             SourceAction<?, ?, ?> sourceAction,
             SourceConfig config,
-            CompletableFuture<Void> completableFuture);
+            CompletableFuture<Void> completableFuture,
+            MetricsContext metricsContext);
 
     protected abstract void collect() throws Exception;
 
@@ -364,7 +367,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
     }
 
     @Override
-    public MetricsContext getMetricsContext() {
+    public SeaTunnelMetricsContext getMetricsContext() {
         return metricsContext;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 0e9b4142d..842cf8a60 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
@@ -66,14 +67,16 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
     protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
             SourceAction<?, ?, ?> sourceAction,
             SourceConfig config,
-            CompletableFuture<Void> completableFuture) {
+            CompletableFuture<Void> completableFuture,
+            MetricsContext metricsContext) {
         return new SourceFlowLifeCycle<>(
                 sourceAction,
                 indexID,
                 config.getEnumeratorTask(),
                 this,
                 taskLocation,
-                completableFuture);
+                completableFuture,
+                metricsContext);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 60ab183b9..97bd53405 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -96,7 +96,8 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 "starting seatunnel source split enumerator task, source name: 
"
                         + source.getName());
         enumeratorContext =
-                new 
SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
+                new SeaTunnelSplitEnumeratorContext<>(
+                        this.source.getParallelism(), this, 
getMetricsContext());
         enumeratorStateSerializer = 
this.source.getSource().getEnumeratorStateSerializer();
         taskMemberMapping = new ConcurrentHashMap<>();
         taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
index a2687eaaf..9010a07a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -61,7 +62,8 @@ public class TransformSeaTunnelTask extends SeaTunnelTask {
     protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
             SourceAction<?, ?, ?> sourceAction,
             SourceConfig config,
-            CompletableFuture<Void> completableFuture) {
+            CompletableFuture<Void> completableFuture,
+            MetricsContext metricsContext) {
         throw new UnsupportedOperationException(
                 "TransformSeaTunnelTask can't create SourceFlowLifeCycle");
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 171bb7378..c5021c38f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.context;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -36,10 +37,15 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
 
     private final SourceSplitEnumeratorTask<SplitT> task;
 
+    private final MetricsContext metricsContext;
+
     public SeaTunnelSplitEnumeratorContext(
-            int parallelism, SourceSplitEnumeratorTask<SplitT> task) {
+            int parallelism,
+            SourceSplitEnumeratorTask<SplitT> task,
+            MetricsContext metricsContext) {
         this.parallelism = parallelism;
         this.task = task;
+        this.metricsContext = metricsContext;
     }
 
     @Override
@@ -74,4 +80,9 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
 
     @Override
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {}
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        return metricsContext;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
index b23090bd9..59cfa3342 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
@@ -17,19 +17,27 @@
 
 package org.apache.seatunnel.engine.server.task.context;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 
 public class SinkWriterContext implements SinkWriter.Context {
 
     private static final long serialVersionUID = -3082515319043725121L;
     private final int indexID;
+    private final MetricsContext metricsContext;
 
-    public SinkWriterContext(int indexID) {
+    public SinkWriterContext(int indexID, MetricsContext metricsContext) {
         this.indexID = indexID;
+        this.metricsContext = metricsContext;
     }
 
     @Override
     public int getIndexOfSubtask() {
         return indexID;
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        return metricsContext;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
index 675a342cc..3a5269305 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.context;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -30,11 +31,17 @@ public class SourceReaderContext implements 
SourceReader.Context {
 
     private final SourceFlowLifeCycle<?, ?> sourceActionLifeCycle;
 
+    private final MetricsContext metricsContext;
+
     public SourceReaderContext(
-            int index, Boundedness boundedness, SourceFlowLifeCycle<?, ?> 
sourceActionLifeCycle) {
+            int index,
+            Boundedness boundedness,
+            SourceFlowLifeCycle<?, ?> sourceActionLifeCycle,
+            MetricsContext metricsContext) {
         this.index = index;
         this.boundedness = boundedness;
         this.sourceActionLifeCycle = sourceActionLifeCycle;
+        this.metricsContext = metricsContext;
     }
 
     @Override
@@ -61,4 +68,9 @@ public class SourceReaderContext implements 
SourceReader.Context {
     public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
         sourceActionLifeCycle.sendSourceEventToEnumerator(sourceEvent);
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        return metricsContext;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 182d7721c..308642e0c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
-import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -27,7 +29,6 @@ import 
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -77,6 +78,10 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private MetricsContext metricsContext;
 
+    private Counter sinkWriteCount;
+
+    private Meter sinkWriteQPS;
+
     private final boolean containAggCommitter;
 
     public SinkFlowLifeCycle(
@@ -95,6 +100,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         this.committerTaskLocation = committerTaskLocation;
         this.containAggCommitter = containAggCommitter;
         this.metricsContext = metricsContext;
+        sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT);
+        sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
     }
 
     @Override
@@ -190,8 +197,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     return;
                 }
                 writer.write((T) record.getData());
-                metricsContext.threadSafeQpsMetric(SINK_WRITE_QPS, 
Unit.COUNT).increment();
-                metricsContext.threadSafeMetric(SINK_WRITE_COUNT, 
Unit.COUNT).increment();
+                sinkWriteCount.inc();
+                sinkWriteQPS.markEvent();
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -231,10 +238,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                             .collect(Collectors.toList());
         }
         if (states.isEmpty()) {
-            this.writer = sinkAction.getSink().createWriter(new 
SinkWriterContext(indexID));
+            this.writer =
+                    sinkAction
+                            .getSink()
+                            .createWriter(new SinkWriterContext(indexID, 
metricsContext));
         } else {
             this.writer =
-                    sinkAction.getSink().restoreWriter(new 
SinkWriterContext(indexID), states);
+                    sinkAction
+                            .getSink()
+                            .restoreWriter(new SinkWriterContext(indexID, 
metricsContext), states);
         }
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b3154172e..a710e579a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -73,18 +74,22 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
 
     private SeaTunnelSourceCollector<T> collector;
 
+    private final MetricsContext metricsContext;
+
     public SourceFlowLifeCycle(
             SourceAction<T, SplitT, ?> sourceAction,
             int indexID,
             TaskLocation enumeratorTaskLocation,
             SeaTunnelTask runningTask,
             TaskLocation currentTaskLocation,
-            CompletableFuture<Void> completableFuture) {
+            CompletableFuture<Void> completableFuture,
+            MetricsContext metricsContext) {
         super(sourceAction, runningTask, completableFuture);
         this.sourceAction = sourceAction;
         this.indexID = indexID;
         this.enumeratorTaskLocation = enumeratorTaskLocation;
         this.currentTaskLocation = currentTaskLocation;
+        this.metricsContext = metricsContext;
     }
 
     public void setCollector(SeaTunnelSourceCollector<T> collector) {
@@ -99,7 +104,10 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                         .getSource()
                         .createReader(
                                 new SourceReaderContext(
-                                        indexID, 
sourceAction.getSource().getBoundedness(), this));
+                                        indexID,
+                                        
sourceAction.getSource().getBoundedness(),
+                                        this,
+                                        metricsContext));
         this.enumeratorTaskAddress = getEnumeratorTaskAddress();
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
index 15c5c8ca3..a10924438 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -57,4 +59,11 @@ public class CoordinatedEnumeratorContext<SplitT extends 
SourceSplit>
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
         coordinatedSource.handleEnumeratorEvent(subtaskId, event);
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // TODO Waiting for Flink and Spark to implement MetricsContext
+        // https://github.com/apache/incubator-seatunnel/issues/3431
+        return new AbstractMetricsContext() {};
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
index 70180c764..4cbdf7e13 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -60,4 +62,11 @@ public class CoordinatedReaderContext implements 
SourceReader.Context {
     public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
         coordinatedSource.handleReaderEvent(subtaskId, sourceEvent);
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // TODO Waiting for Flink and Spark to implement MetricsContext
+        // https://github.com/apache/incubator-seatunnel/issues/3431
+        return new AbstractMetricsContext() {};
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
index cd09ad0b3..f42c14d03 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -73,4 +75,11 @@ public class ParallelEnumeratorContext<SplitT extends 
SourceSplit>
         // TODO: exception
         throw new RuntimeException("");
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // TODO Waiting for Flink and Spark to implement MetricsContext
+        // https://github.com/apache/incubator-seatunnel/issues/3431
+        return new AbstractMetricsContext() {};
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 719a86fa9..45ae29767 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -59,4 +61,11 @@ public class ParallelReaderContext implements 
SourceReader.Context {
         // TODO: exception
         throw new RuntimeException("");
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // TODO Waiting for Flink and Spark to implement MetricsContext
+        // https://github.com/apache/incubator-seatunnel/issues/3431
+        return new AbstractMetricsContext() {};
+    }
 }

Reply via email to