This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae9023759 [INLONG-6912][Sort] Add table level metric for Doris (#7018)
ae9023759 is described below
commit ae902375950bc1eba7ffad5d6d92991bac298a16
Author: kuansix <[email protected]>
AuthorDate: Mon Dec 26 10:20:35 2022 +0800
[INLONG-6912][Sort] Add table level metric for Doris (#7018)
---
.../doris/table/DorisDynamicSchemaOutputFormat.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 4bf1574fc..557851cbe 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -50,7 +50,7 @@ import
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.doris.model.RespContent;
import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -136,7 +136,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
- private transient SinkMetricData metricData;
+ private transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
private final String[] fieldNames;
@@ -270,7 +270,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
.withRegisterMetric(MetricOption.RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ metricData = new SinkTableMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ if (multipleSink) {
+ metricData.registerSubMetricsGroup(metricState);
+ }
}
if (dirtySink != null) {
try {
@@ -607,7 +610,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
respContent = load(tableIdentifier, loadValue);
try {
if (null != metricData && null != respContent) {
- metricData.invoke(respContent.getNumberLoadedRows(),
respContent.getLoadBytes());
+ if (multipleSink) {
+ String[] tableWithDb = tableIdentifier.split("\\.");
+ metricData.outputMetricsWithEstimate(tableWithDb[0],
null, tableWithDb[1],
+ false, respContent.getNumberLoadedRows(),
respContent.getLoadBytes());
+ } else {
+ metricData.invoke(respContent.getNumberLoadedRows(),
respContent.getLoadBytes());
+ }
}
} catch (Exception e) {
LOG.warn("metricData invoke get err:", e);