This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 32e1f91c7 [Feature][API] Add Metrics for Connector-V2 (#4017)
32e1f91c7 is described below
commit 32e1f91c7a315e1d2718bf2380e841bcbeef4ba4
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 15 22:49:55 2023 +0800
[Feature][API] Add Metrics for Connector-V2 (#4017)
* [Feature][API] add Metrics for Connector V2
* [Feature][API] add Metrics for Connector V2
* fix merge conflict
* fix merge conflict
* fix merge conflict
* fix Elasticsearch test container start timeOut
* fix checkstyle
---
.../api/common/metrics/AbstractMetricsContext.java | 82 ++++++
.../common/metrics/{Metric.java => Counter.java} | 43 +--
.../apache/seatunnel/api/common/metrics/Meter.java | 36 ++-
.../seatunnel/api/common/metrics/Metric.java | 19 +-
.../api/common/metrics/MetricsContext.java | 57 ++++
.../api/common/metrics/ThreadSafeCounter.java | 81 ++++++
.../api/common/metrics/ThreadSafeQPSMeter.java | 85 ++++++
.../api/sink/DefaultSinkWriterContext.java | 10 +
.../org/apache/seatunnel/api/sink/SinkWriter.java | 5 +
.../apache/seatunnel/api/source/SourceReader.java | 4 +
.../api/source/SourceSplitEnumerator.java | 4 +
.../cdc/base/source/IncrementalSource.java | 8 +-
.../reader/IncrementalSourceRecordEmitter.java | 35 ++-
.../engine/server/TaskExecutionService.java | 4 +-
.../seatunnel/engine/server/execution/Task.java | 2 +-
.../server/execution/TaskExecutionContext.java | 8 +-
.../seatunnel/engine/server/master/JobMaster.java | 4 +-
.../engine/server/metrics/MetricsContext.java | 314 ---------------------
.../server/metrics/SeaTunnelMetricsContext.java | 67 +++++
.../engine/server/task/CoordinatorTask.java | 30 ++
.../server/task/SeaTunnelSourceCollector.java | 16 +-
.../engine/server/task/SeaTunnelTask.java | 13 +-
.../engine/server/task/SourceSeaTunnelTask.java | 7 +-
.../server/task/SourceSplitEnumeratorTask.java | 3 +-
.../engine/server/task/TransformSeaTunnelTask.java | 4 +-
.../context/SeaTunnelSplitEnumeratorContext.java | 13 +-
.../server/task/context/SinkWriterContext.java | 10 +-
.../server/task/context/SourceReaderContext.java | 14 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 24 +-
.../server/task/flow/SourceFlowLifeCycle.java | 12 +-
.../source/CoordinatedEnumeratorContext.java | 9 +
.../source/CoordinatedReaderContext.java | 9 +
.../source/ParallelEnumeratorContext.java | 9 +
.../translation/source/ParallelReaderContext.java | 9 +
34 files changed, 648 insertions(+), 402 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
new file mode 100644
index 000000000..a8ce35485
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/AbstractMetricsContext.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public abstract class AbstractMetricsContext implements MetricsContext,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+ @Override
+ public Counter counter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Counter) metrics.get(name);
+ }
+ return this.counter(name, new ThreadSafeCounter(name));
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
+ this.addMetric(name, counter);
+ return counter;
+ }
+
+ @Override
+ public Meter meter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Meter) metrics.get(name);
+ }
+ return this.meter(name, new ThreadSafeQPSMeter(name));
+ }
+
+ @Override
+ public <M extends Meter> M meter(String name, M meter) {
+ this.addMetric(name, meter);
+ return meter;
+ }
+
+ protected void addMetric(String name, Metric metric) {
+ if (metric == null) {
+ log.warn("Ignoring attempted add of a metric due to being null for
name {}.", name);
+ } else {
+ synchronized (this) {
+ Metric prior = this.metrics.put(name, metric);
+ if (prior != null) {
+ this.metrics.put(name, prior);
+ log.warn(
+ "Name collision: MetricsContext already contains a
Metric with the name '"
+ + name
+ + "'. Metric will not be reported.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractMetricsContext{" + "metrics=" + metrics + '}';
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
similarity index 52%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
index 97127fda4..0bd657b02 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Counter.java
@@ -17,31 +17,36 @@
package org.apache.seatunnel.api.common.metrics;
-public interface Metric {
+/** A Counter is a {@link Metric} that measures a count. */
+public interface Counter extends Metric {
- /** Returns the name of the associated metric. */
- String name();
+ /** Increment the current count by 1. */
+ void inc();
/**
- * Return the measurement unit for the associated metric. Meant to provide
further information
- * on the type of value measured by the user-defined metric. Doesn't
affect the functionality of
- * the metric, it still remains a simple numeric value, but is used to
populate the {@link
- * MetricTags#UNIT} tag in the metric's description.
+ * Increment the current count by the given value.
+ *
+ * @param n value to increment the current count by
*/
- Unit unit();
+ void inc(long n);
- /** Increments the current value by 1. */
- void increment();
+ /** Decrement the current count by 1. */
+ void dec();
- /** Increments the current value by the specified amount. */
- void increment(long amount);
-
- /** Decrements the current value by 1. */
- void decrement();
-
- /** Decrements the current value by the specified amount. */
- void decrement(long amount);
+ /**
+ * Decrement the current count by the given value.
+ *
+ * @param n value to decrement the current count by
+ */
+ void dec(long n);
/** Sets the current value. */
- void set(long newValue);
+ void set(long n);
+
+ /**
+ * Returns the current count.
+ *
+ * @return current count
+ */
+ long getCount();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
similarity index 56%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
index b23090bd9..69b4fdf50 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Meter.java
@@ -15,21 +15,31 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.context;
+package org.apache.seatunnel.api.common.metrics;
-import org.apache.seatunnel.api.sink.SinkWriter;
+/** Metric for measuring throughput. */
+public interface Meter extends Metric {
+ /** Mark occurrence of an event. */
+ void markEvent();
-public class SinkWriterContext implements SinkWriter.Context {
+ /**
+ * Mark occurrence of multiple events.
+ *
+ * @param n number of events occurred
+ */
+ void markEvent(long n);
- private static final long serialVersionUID = -3082515319043725121L;
- private final int indexID;
+ /**
+ * Returns the current rate of events per second.
+ *
+ * @return current rate of events per second
+ */
+ double getRate();
- public SinkWriterContext(int indexID) {
- this.indexID = indexID;
- }
-
- @Override
- public int getIndexOfSubtask() {
- return indexID;
- }
+ /**
+ * Get number of events marked on the meter.
+ *
+ * @return number of events marked on the meter
+ */
+ long getCount();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
index 97127fda4..55b1cdee4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.api.common.metrics;
-public interface Metric {
+import java.io.Serializable;
+
+public interface Metric extends Serializable {
/** Returns the name of the associated metric. */
String name();
@@ -29,19 +31,4 @@ public interface Metric {
* MetricTags#UNIT} tag in the metric's description.
*/
Unit unit();
-
- /** Increments the current value by 1. */
- void increment();
-
- /** Increments the current value by the specified amount. */
- void increment(long amount);
-
- /** Decrements the current value by 1. */
- void decrement();
-
- /** Decrements the current value by the specified amount. */
- void decrement(long amount);
-
- /** Sets the current value. */
- void set(long newValue);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
new file mode 100644
index 000000000..a8875d91d
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricsContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public interface MetricsContext {
+
+ /**
+ * registers a {@link ThreadSafeCounter} with SeaTunnel.
+ *
+ * @param name name of the counter
+ * @return the created counter
+ */
+ Counter counter(String name);
+
+ /**
+ * Registers a {@link Counter} with SeaTunnel.
+ *
+ * @param name name of the counter
+ * @param counter counter to register
+ * @param <C> counter type
+ * @return the given counter
+ */
+ <C extends Counter> C counter(String name, C counter);
+
+ /**
+ * Registers a {@link ThreadSafeQPSMeter} with SeaTunnel.
+ *
+ * @param name name of the meter
+ * @return the registered meter
+ */
+ Meter meter(String name);
+
+ /**
+ * Registers a new {@link Meter} with SeaTunnel.
+ *
+ * @param name name of the meter
+ * @param meter meter to register
+ * @param <M> meter type
+ * @return the registered meter
+ */
+ <M extends Meter> M meter(String name, M meter);
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
new file mode 100644
index 000000000..7094bc429
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeCounter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class ThreadSafeCounter implements Counter, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+ private static final AtomicLongFieldUpdater<ThreadSafeCounter>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ThreadSafeCounter.class,
"value");
+
+ private volatile long value;
+
+ public ThreadSafeCounter(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void inc() {
+ VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+ }
+
+ @Override
+ public void inc(long n) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, n);
+ }
+
+ @Override
+ public void dec() {
+ VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+ }
+
+ @Override
+ public void dec(long n) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, -n);
+ }
+
+ @Override
+ public void set(long n) {
+ VOLATILE_VALUE_UPDATER.set(this, n);
+ }
+
+ @Override
+ public long getCount() {
+ return VOLATILE_VALUE_UPDATER.get(this);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return Unit.COUNT;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadSafeCounter{" + "name='" + name + '\'' + ", value=" +
value + '}';
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
new file mode 100644
index 000000000..627e9bd4c
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/ThreadSafeQPSMeter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public class ThreadSafeQPSMeter implements Meter, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final AtomicLongFieldUpdater<ThreadSafeQPSMeter>
VOLATILE_VALUE_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMeter.class,
"value");
+
+ private final String name;
+
+ private volatile long value;
+
+ private final long timestamp;
+
+ public ThreadSafeQPSMeter(String name) {
+ this.name = name;
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void markEvent() {
+ VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+ }
+
+ @Override
+ public void markEvent(long n) {
+ VOLATILE_VALUE_UPDATER.addAndGet(this, n);
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public double getRate() {
+ long cost = System.currentTimeMillis() - timestamp;
+ return (double) value * 1000 / cost;
+ }
+
+ @Override
+ public long getCount() {
+ return VOLATILE_VALUE_UPDATER.get(this);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return Unit.COUNT;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadSafeQPSMeter{"
+ + "name='"
+ + name
+ + '\''
+ + ", value="
+ + value
+ + ", timestamp="
+ + timestamp
+ + '}';
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 05fd64034..43a92f35f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
/** The default {@link SinkWriter.Context} implement class. */
public class DefaultSinkWriterContext implements SinkWriter.Context {
private final int subtask;
@@ -29,4 +32,11 @@ public class DefaultSinkWriterContext implements
SinkWriter.Context {
public int getIndexOfSubtask() {
return subtask;
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // TODO Waiting for Flink and Spark to implement MetricsContext
+ // https://github.com/apache/incubator-seatunnel/issues/3431
+ return new AbstractMetricsContext() {};
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index e6996ec83..c0fbe2c02 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -79,5 +81,8 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
/** @return The index of this subtask. */
int getIndexOfSubtask();
+
+ /** @return metricsContext of this reader. */
+ MetricsContext getMetricsContext();
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index bb98308ff..50ec0c137 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.source;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.state.CheckpointListener;
import java.io.IOException;
@@ -107,5 +108,8 @@ public interface SourceReader<T, SplitT extends SourceSplit>
* @param sourceEvent the source event to coordinator.
*/
void sendSourceEventToEnumerator(SourceEvent sourceEvent);
+
+ /** @return metricsContext of this reader. */
+ MetricsContext getMetricsContext();
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 7acf6d677..49d3c7c1f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.source;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.state.CheckpointListener;
import java.io.IOException;
@@ -116,5 +117,8 @@ public interface SourceSplitEnumerator<SplitT extends
SourceSplit, StateT>
* @param event the source event to send.
*/
void sendEventToSourceReader(int subtaskId, SourceEvent event);
+
+ /** @return metricsContext of this reader. */
+ MetricsContext getMetricsContext();
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 5c56a73fe..4cd707da4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.cdc.base.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -142,15 +143,16 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
return new IncrementalSourceReader<>(
elementsQueue,
splitReaderSupplier,
- createRecordEmitter(sourceConfig),
+ createRecordEmitter(sourceConfig,
readerContext.getMetricsContext()),
new SourceReaderOptions(readonlyConfig),
readerContext,
sourceConfig);
}
protected RecordEmitter<SourceRecords, T, SourceSplitStateBase>
createRecordEmitter(
- SourceConfig sourceConfig) {
- return new IncrementalSourceRecordEmitter<>(deserializationSchema,
offsetFactory);
+ SourceConfig sourceConfig, MetricsContext metricsContext) {
+ return new IncrementalSourceRecordEmitter<>(
+ deserializationSchema, offsetFactory, metricsContext);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index a0969ac02..49eb17ede 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.cdc.base.source.reader;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
@@ -35,6 +37,8 @@ import java.util.Map;
import static
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isHighWatermarkEvent;
import static
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
+import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp;
+import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isSchemaChangeEvent;
@@ -48,17 +52,26 @@ import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.i
public class IncrementalSourceRecordEmitter<T>
implements RecordEmitter<SourceRecords, T, SourceSplitStateBase> {
+ private static final String CDC_RECORD_FETCH_DELAY = "CDCRecordFetchDelay";
+ private static final String CDC_RECORD_EMIT_DELAY = "CDCRecordEmitDelay";
+
protected final DebeziumDeserializationSchema<T>
debeziumDeserializationSchema;
protected final OutputCollector<T> outputCollector;
protected final OffsetFactory offsetFactory;
+ protected final Counter recordFetchDelay;
+ protected final Counter recordEmitDelay;
+
public IncrementalSourceRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
- OffsetFactory offsetFactory) {
+ OffsetFactory offsetFactory,
+ MetricsContext metricsContext) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.outputCollector = new OutputCollector<>();
this.offsetFactory = offsetFactory;
+ this.recordFetchDelay = metricsContext.counter(CDC_RECORD_FETCH_DELAY);
+ this.recordEmitDelay = metricsContext.counter(CDC_RECORD_EMIT_DELAY);
}
@Override
@@ -67,7 +80,25 @@ public class IncrementalSourceRecordEmitter<T>
throws Exception {
final Iterator<SourceRecord> elementIterator =
sourceRecords.iterator();
while (elementIterator.hasNext()) {
- processElement(elementIterator.next(), collector, splitState);
+ SourceRecord next = elementIterator.next();
+ reportMetrics(next);
+ processElement(next, collector, splitState);
+ }
+ }
+
+ protected void reportMetrics(SourceRecord element) {
+ long now = System.currentTimeMillis();
+ // record the latest process time
+ Long messageTimestamp = getMessageTimestamp(element);
+
+ if (messageTimestamp != null && messageTimestamp > 0L) {
+ // report fetch delay
+ Long fetchTimestamp = getFetchTimestamp(element);
+ if (fetchTimestamp != null) {
+ recordFetchDelay.set(fetchTimestamp - messageTimestamp);
+ }
+ // report emit delay
+ recordEmitDelay.set(now - messageTimestamp);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 089b27aeb..c221a78e8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -36,7 +36,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -432,7 +432,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
contextMap.putAll(executionContexts);
contextMap.putAll(finishedExecutionContexts);
try {
- IMap<TaskLocation, MetricsContext> map =
+ IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
contextMap.forEach(
(taskGroupLocation, taskGroupContext) -> {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 820ba6abc..58afa695d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.engine.server.execution;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.Stateful;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 7d51803dc..bf4434544 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.execution;
import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
@@ -50,10 +50,10 @@ public class TaskExecutionContext {
return nodeEngine.getLogger(task.getClass());
}
- public MetricsContext getOrCreateMetricsContext(TaskLocation taskLocation)
{
- IMap<TaskLocation, MetricsContext> map =
+ public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation
taskLocation) {
+ IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- return map.computeIfAbsent(taskLocation, k -> new MetricsContext());
+ return map.computeIfAbsent(taskLocation, k -> new
SeaTunnelMetricsContext());
}
public <T> T getTask() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 12a54359a..21e11d992 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -50,7 +50,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -488,7 +488,7 @@ public class JobMaster {
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if (pipelineStatus.equals(PipelineStatus.FINISHED) &&
!checkpointManager.isSavePointEnd()
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {
- IMap<TaskLocation, MetricsContext> map =
+ IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
map.keySet().stream()
.filter(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
deleted file mode 100644
index 40023968d..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.metrics;
-
-import org.apache.seatunnel.api.common.metrics.Metric;
-import org.apache.seatunnel.api.common.metrics.Unit;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
-
-import com.hazelcast.internal.metrics.DynamicMetricsProvider;
-import com.hazelcast.internal.metrics.MetricDescriptor;
-import com.hazelcast.internal.metrics.MetricsCollectionContext;
-import com.hazelcast.internal.metrics.ProbeLevel;
-import com.hazelcast.internal.metrics.ProbeUnit;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.function.BiFunction;
-
-public class MetricsContext implements DynamicMetricsProvider, Serializable {
-
- private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_METRIC =
- SingleWriterMetric::new;
- private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_METRICS =
- ThreadSafeMetric::new;
- private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_QPS_METRIC =
- SingleWriterQPSMetric::new;
- private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_QPS_METRIC =
- ThreadSafeQPSMetric::new;
-
- private volatile Map<String, AbstractMetric> metrics;
-
- public Metric metric(String name, Unit unit) {
- return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
- }
-
- public Metric qpsMetric(String name, Unit unit) {
- return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
- }
-
- public Metric threadSafeMetric(String name, Unit unit) {
- return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
- }
-
- public Metric threadSafeQpsMetric(String name, Unit unit) {
- return metric(name, unit, CREATE_THREAD_SAFE_QPS_METRIC);
- }
-
- private Metric metric(
- String name, Unit unit, BiFunction<String, Unit, AbstractMetric>
metricSupplier) {
- if (metrics == null) { // first metric being stored
- metrics = new ConcurrentHashMap<>();
- }
-
- AbstractMetric metric = metrics.get(name);
- if (metric != null) {
- return metric;
- }
-
- metric = metricSupplier.apply(name, unit);
- metrics.put(name, metric);
-
- return metric;
- }
-
- @Override
- public void provideDynamicMetrics(MetricDescriptor tagger,
MetricsCollectionContext context) {
- if (metrics != null) {
- metrics.forEach(
- (name, metric) -> {
- if (metric.get() instanceof Long) {
- context.collect(
- tagger.copy(),
- name,
- ProbeLevel.INFO,
- toProbeUnit(metric.unit()),
- (Long) metric.get());
- } else if (metric.get() instanceof Double) {
- context.collect(
- tagger.copy(),
- name,
- ProbeLevel.INFO,
- toProbeUnit(metric.unit()),
- (Double) metric.get());
- } else {
- throw new SeaTunnelException(
- "The value of Metric does not support "
- +
metric.get().getClass().getSimpleName()
- + " data type");
- }
- });
- }
- }
-
- private ProbeUnit toProbeUnit(Unit unit) {
- return ProbeUnit.valueOf(unit.name());
- }
-
- private abstract static class AbstractMetric implements Metric,
Serializable {
-
- private final String name;
- private final Unit unit;
-
- AbstractMetric(String name, Unit unit) {
- this.name = name;
- this.unit = unit;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Unit unit() {
- return unit;
- }
-
- protected abstract Object get();
- }
-
- private static final class SingleWriterQPSMetric extends AbstractMetric {
-
- private static final AtomicLongFieldUpdater<SingleWriterQPSMetric>
VOLATILE_VALUE_UPDATER =
- AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class,
"value");
-
- private volatile long value;
- private final long timestamp;
-
- SingleWriterQPSMetric(String name, Unit unit) {
- super(name, unit);
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- public void set(long newValue) {
- VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
- }
-
- @Override
- public void increment() {
- VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
- }
-
- @Override
- public void increment(long increment) {
- VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
- }
-
- @Override
- public void decrement() {
- VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
- }
-
- @Override
- public void decrement(long decrement) {
- VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
- }
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Override
- protected Object get() {
- long cost = System.currentTimeMillis() - timestamp;
- return (double) value * 1000 / cost;
- }
- }
-
- private static final class ThreadSafeQPSMetric extends AbstractMetric {
-
- private static final AtomicLongFieldUpdater<ThreadSafeQPSMetric>
VOLATILE_VALUE_UPDATER =
- AtomicLongFieldUpdater.newUpdater(ThreadSafeQPSMetric.class,
"value");
-
- private volatile long value;
-
- private final long timestamp;
-
- ThreadSafeQPSMetric(String name, Unit unit) {
- super(name, unit);
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- public void increment() {
- VOLATILE_VALUE_UPDATER.incrementAndGet(this);
- }
-
- @Override
- public void increment(long amount) {
- VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
- }
-
- @Override
- public void decrement() {
- VOLATILE_VALUE_UPDATER.decrementAndGet(this);
- }
-
- @Override
- public void decrement(long amount) {
- VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
- }
-
- @Override
- public void set(long newValue) {
- VOLATILE_VALUE_UPDATER.set(this, newValue);
- }
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Override
- protected Object get() {
- long cost = System.currentTimeMillis() - timestamp;
- return (double) value * 1000 / cost;
- }
- }
-
- private static final class SingleWriterMetric extends AbstractMetric {
-
- private static final AtomicLongFieldUpdater<SingleWriterMetric>
VOLATILE_VALUE_UPDATER =
- AtomicLongFieldUpdater.newUpdater(SingleWriterMetric.class,
"value");
-
- private volatile long value;
-
- SingleWriterMetric(String name, Unit unit) {
- super(name, unit);
- }
-
- @Override
- public void set(long newValue) {
- VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
- }
-
- @Override
- public void increment() {
- VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
- }
-
- @Override
- public void increment(long increment) {
- VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
- }
-
- @Override
- public void decrement() {
- VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
- }
-
- @Override
- public void decrement(long decrement) {
- VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
- }
-
- @Override
- protected Object get() {
- return value;
- }
- }
-
- private static final class ThreadSafeMetric extends AbstractMetric {
-
- private static final AtomicLongFieldUpdater<ThreadSafeMetric>
VOLATILE_VALUE_UPDATER =
- AtomicLongFieldUpdater.newUpdater(ThreadSafeMetric.class,
"value");
-
- private volatile long value;
-
- ThreadSafeMetric(String name, Unit unit) {
- super(name, unit);
- }
-
- @Override
- public void increment() {
- VOLATILE_VALUE_UPDATER.incrementAndGet(this);
- }
-
- @Override
- public void increment(long amount) {
- VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
- }
-
- @Override
- public void decrement() {
- VOLATILE_VALUE_UPDATER.decrementAndGet(this);
- }
-
- @Override
- public void decrement(long amount) {
- VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
- }
-
- @Override
- public void set(long newValue) {
- VOLATILE_VALUE_UPDATER.set(this, newValue);
- }
-
- @Override
- protected Object get() {
- return VOLATILE_VALUE_UPDATER.get(this);
- }
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
new file mode 100644
index 000000000..2ed763b99
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/SeaTunnelMetricsContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.ProbeLevel;
+import com.hazelcast.internal.metrics.ProbeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SeaTunnelMetricsContext extends AbstractMetricsContext
+ implements DynamicMetricsProvider {
+
+ @Override
+ public void provideDynamicMetrics(MetricDescriptor tagger,
MetricsCollectionContext context) {
+ metrics.forEach(
+ (name, metric) -> {
+ if (metric instanceof Counter) {
+ context.collect(
+ tagger.copy(),
+ name,
+ ProbeLevel.INFO,
+ toProbeUnit(metric.unit()),
+ ((Counter) metric).getCount());
+ } else if (metric instanceof Meter) {
+ context.collect(
+ tagger.copy(),
+ name,
+ ProbeLevel.INFO,
+ toProbeUnit(metric.unit()),
+ ((Meter) metric).getRate());
+ } else {
+ throw new SeaTunnelException(
+ "The value of Metric does not support "
+ + metric.getClass().getSimpleName()
+ + " data type");
+ }
+ });
+ }
+
+ private ProbeUnit toProbeUnit(Unit unit) {
+ return ProbeUnit.valueOf(unit.name());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
index c4b019cd6..b56266de4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
@@ -17,13 +17,43 @@
package org.apache.seatunnel.engine.server.task;
+import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
public abstract class CoordinatorTask extends AbstractTask {
private static final long serialVersionUID = -3957168748281681077L;
+ private SeaTunnelMetricsContext metricsContext;
+
public CoordinatorTask(long jobID, TaskLocation taskID) {
super(jobID, taskID);
}
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+ metricsContext =
getExecutionContext().getOrCreateMetricsContext(taskLocation);
+ }
+
+ @Override
+ public SeaTunnelMetricsContext getMetricsContext() {
+ return metricsContext;
+ }
+
+ @Override
+ public void provideDynamicMetrics(
+ MetricDescriptor descriptor, MetricsCollectionContext context) {
+ if (null != metricsContext) {
+ metricsContext.provideDynamicMetrics(
+ descriptor
+ .copy()
+ .withTag(MetricTags.TASK_NAME,
this.getClass().getSimpleName()),
+ context);
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 03ff826d7..7ebcdf0d8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,10 +17,11 @@
package org.apache.seatunnel.engine.server.task;
-import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import java.io.IOException;
@@ -35,7 +36,9 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
- private final MetricsContext metricsContext;
+ private final Counter sourceReceivedCount;
+
+ private final Meter sourceReceivedQPS;
public SeaTunnelSourceCollector(
Object checkpointLock,
@@ -43,15 +46,16 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
MetricsContext metricsContext) {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
- this.metricsContext = metricsContext;
+ sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
+ sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
}
@Override
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
- metricsContext.threadSafeQpsMetric(SOURCE_RECEIVED_QPS,
Unit.COUNT).increment();
- metricsContext.threadSafeMetric(SOURCE_RECEIVED_COUNT,
Unit.COUNT).increment();
+ sourceReceivedCount.inc();
+ sourceReceivedQPS.markEvent();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e8418c71d..942514055 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -40,7 +41,7 @@ import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
import
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
@@ -108,7 +109,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
private TaskGroup taskBelongGroup;
- private MetricsContext metricsContext;
+ private SeaTunnelMetricsContext metricsContext;
public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow
executionFlow) {
super(jobID, taskID);
@@ -208,7 +209,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
createSourceFlowLifeCycle(
(SourceAction<?, ?, ?>) f.getAction(),
(SourceConfig) f.getConfig(),
- completableFuture);
+ completableFuture,
+ this.getMetricsContext());
outputs = flowLifeCycles;
} else if (f.getAction() instanceof SinkAction) {
lifeCycle =
@@ -258,7 +260,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
protected abstract SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
SourceAction<?, ?, ?> sourceAction,
SourceConfig config,
- CompletableFuture<Void> completableFuture);
+ CompletableFuture<Void> completableFuture,
+ MetricsContext metricsContext);
protected abstract void collect() throws Exception;
@@ -364,7 +367,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
}
@Override
- public MetricsContext getMetricsContext() {
+ public SeaTunnelMetricsContext getMetricsContext() {
return metricsContext;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 0e9b4142d..842cf8a60 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
@@ -66,14 +67,16 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
SourceAction<?, ?, ?> sourceAction,
SourceConfig config,
- CompletableFuture<Void> completableFuture) {
+ CompletableFuture<Void> completableFuture,
+ MetricsContext metricsContext) {
return new SourceFlowLifeCycle<>(
sourceAction,
indexID,
config.getEnumeratorTask(),
this,
taskLocation,
- completableFuture);
+ completableFuture,
+ metricsContext);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 60ab183b9..97bd53405 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -96,7 +96,8 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
"starting seatunnel source split enumerator task, source name:
"
+ source.getName());
enumeratorContext =
- new
SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
+ new SeaTunnelSplitEnumeratorContext<>(
+ this.source.getParallelism(), this,
getMetricsContext());
enumeratorStateSerializer =
this.source.getSource().getEnumeratorStateSerializer();
taskMemberMapping = new ConcurrentHashMap<>();
taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
index a2687eaaf..9010a07a5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -61,7 +62,8 @@ public class TransformSeaTunnelTask extends SeaTunnelTask {
protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(
SourceAction<?, ?, ?> sourceAction,
SourceConfig config,
- CompletableFuture<Void> completableFuture) {
+ CompletableFuture<Void> completableFuture,
+ MetricsContext metricsContext) {
throw new UnsupportedOperationException(
"TransformSeaTunnelTask can't create SourceFlowLifeCycle");
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 171bb7378..c5021c38f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.context;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -36,10 +37,15 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
private final SourceSplitEnumeratorTask<SplitT> task;
+ private final MetricsContext metricsContext;
+
public SeaTunnelSplitEnumeratorContext(
- int parallelism, SourceSplitEnumeratorTask<SplitT> task) {
+ int parallelism,
+ SourceSplitEnumeratorTask<SplitT> task,
+ MetricsContext metricsContext) {
this.parallelism = parallelism;
this.task = task;
+ this.metricsContext = metricsContext;
}
@Override
@@ -74,4 +80,9 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return metricsContext;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
index b23090bd9..59cfa3342 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
@@ -17,19 +17,27 @@
package org.apache.seatunnel.engine.server.task.context;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.sink.SinkWriter;
public class SinkWriterContext implements SinkWriter.Context {
private static final long serialVersionUID = -3082515319043725121L;
private final int indexID;
+ private final MetricsContext metricsContext;
- public SinkWriterContext(int indexID) {
+ public SinkWriterContext(int indexID, MetricsContext metricsContext) {
this.indexID = indexID;
+ this.metricsContext = metricsContext;
}
@Override
public int getIndexOfSubtask() {
return indexID;
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return metricsContext;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
index 675a342cc..3a5269305 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SourceReaderContext.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.context;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
@@ -30,11 +31,17 @@ public class SourceReaderContext implements
SourceReader.Context {
private final SourceFlowLifeCycle<?, ?> sourceActionLifeCycle;
+ private final MetricsContext metricsContext;
+
public SourceReaderContext(
- int index, Boundedness boundedness, SourceFlowLifeCycle<?, ?>
sourceActionLifeCycle) {
+ int index,
+ Boundedness boundedness,
+ SourceFlowLifeCycle<?, ?> sourceActionLifeCycle,
+ MetricsContext metricsContext) {
this.index = index;
this.boundedness = boundedness;
this.sourceActionLifeCycle = sourceActionLifeCycle;
+ this.metricsContext = metricsContext;
}
@Override
@@ -61,4 +68,9 @@ public class SourceReaderContext implements
SourceReader.Context {
public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
sourceActionLifeCycle.sendSourceEventToEnumerator(sourceEvent);
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return metricsContext;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 182d7721c..308642e0c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.engine.server.task.flow;
-import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
@@ -27,7 +29,6 @@ import
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -77,6 +78,10 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private MetricsContext metricsContext;
+ private Counter sinkWriteCount;
+
+ private Meter sinkWriteQPS;
+
private final boolean containAggCommitter;
public SinkFlowLifeCycle(
@@ -95,6 +100,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
this.committerTaskLocation = committerTaskLocation;
this.containAggCommitter = containAggCommitter;
this.metricsContext = metricsContext;
+ sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT);
+ sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
}
@Override
@@ -190,8 +197,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
return;
}
writer.write((T) record.getData());
- metricsContext.threadSafeQpsMetric(SINK_WRITE_QPS,
Unit.COUNT).increment();
- metricsContext.threadSafeMetric(SINK_WRITE_COUNT,
Unit.COUNT).increment();
+ sinkWriteCount.inc();
+ sinkWriteQPS.markEvent();
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -231,10 +238,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
.collect(Collectors.toList());
}
if (states.isEmpty()) {
- this.writer = sinkAction.getSink().createWriter(new
SinkWriterContext(indexID));
+ this.writer =
+ sinkAction
+ .getSink()
+ .createWriter(new SinkWriterContext(indexID,
metricsContext));
} else {
this.writer =
- sinkAction.getSink().restoreWriter(new
SinkWriterContext(indexID), states);
+ sinkAction
+ .getSink()
+ .restoreWriter(new SinkWriterContext(indexID,
metricsContext), states);
}
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b3154172e..a710e579a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.flow;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
@@ -73,18 +74,22 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
private SeaTunnelSourceCollector<T> collector;
+ private final MetricsContext metricsContext;
+
public SourceFlowLifeCycle(
SourceAction<T, SplitT, ?> sourceAction,
int indexID,
TaskLocation enumeratorTaskLocation,
SeaTunnelTask runningTask,
TaskLocation currentTaskLocation,
- CompletableFuture<Void> completableFuture) {
+ CompletableFuture<Void> completableFuture,
+ MetricsContext metricsContext) {
super(sourceAction, runningTask, completableFuture);
this.sourceAction = sourceAction;
this.indexID = indexID;
this.enumeratorTaskLocation = enumeratorTaskLocation;
this.currentTaskLocation = currentTaskLocation;
+ this.metricsContext = metricsContext;
}
public void setCollector(SeaTunnelSourceCollector<T> collector) {
@@ -99,7 +104,10 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
.getSource()
.createReader(
new SourceReaderContext(
- indexID,
sourceAction.getSource().getBoundedness(), this));
+ indexID,
+
sourceAction.getSource().getBoundedness(),
+ this,
+ metricsContext));
this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
index 15c5c8ca3..a10924438 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -57,4 +59,11 @@ public class CoordinatedEnumeratorContext<SplitT extends
SourceSplit>
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
coordinatedSource.handleEnumeratorEvent(subtaskId, event);
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // TODO Waiting for Flink and Spark to implement MetricsContext
+ // https://github.com/apache/incubator-seatunnel/issues/3431
+ return new AbstractMetricsContext() {};
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
index 70180c764..4cbdf7e13 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
@@ -60,4 +62,11 @@ public class CoordinatedReaderContext implements
SourceReader.Context {
public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
coordinatedSource.handleReaderEvent(subtaskId, sourceEvent);
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // TODO Waiting for Flink and Spark to implement MetricsContext
+ // https://github.com/apache/incubator-seatunnel/issues/3431
+ return new AbstractMetricsContext() {};
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
index cd09ad0b3..f42c14d03 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -73,4 +75,11 @@ public class ParallelEnumeratorContext<SplitT extends
SourceSplit>
// TODO: exception
throw new RuntimeException("");
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // TODO Waiting for Flink and Spark to implement MetricsContext
+ // https://github.com/apache/incubator-seatunnel/issues/3431
+ return new AbstractMetricsContext() {};
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 719a86fa9..45ae29767 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
@@ -59,4 +61,11 @@ public class ParallelReaderContext implements
SourceReader.Context {
// TODO: exception
throw new RuntimeException("");
}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // TODO Waiting for Flink and Spark to implement MetricsContext
+ // https://github.com/apache/incubator-seatunnel/issues/3431
+ return new AbstractMetricsContext() {};
+ }
}