This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae9023759 [INLONG-6912][Sort] Add table level metric for Doris (#7018)
ae9023759 is described below

commit ae902375950bc1eba7ffad5d6d92991bac298a16
Author: kuansix <[email protected]>
AuthorDate: Mon Dec 26 10:20:35 2022 +0800

    [INLONG-6912][Sort] Add table level metric for Doris (#7018)
---
 .../doris/table/DorisDynamicSchemaOutputFormat.java     | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 4bf1574fc..557851cbe 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -50,7 +50,7 @@ import 
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.doris.model.RespContent;
 import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -136,7 +136,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
     private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
-    private transient SinkMetricData metricData;
+    private transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
     private final String[] fieldNames;
@@ -270,7 +270,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            metricData = new SinkTableMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            if (multipleSink) {
+                metricData.registerSubMetricsGroup(metricState);
+            }
         }
         if (dirtySink != null) {
             try {
@@ -607,7 +610,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             respContent = load(tableIdentifier, loadValue);
             try {
                 if (null != metricData && null != respContent) {
-                    metricData.invoke(respContent.getNumberLoadedRows(), 
respContent.getLoadBytes());
+                    if (multipleSink) {
+                        String[] tableWithDb = tableIdentifier.split("\\.");
+                        metricData.outputMetricsWithEstimate(tableWithDb[0], 
null, tableWithDb[1],
+                                false, respContent.getNumberLoadedRows(), 
respContent.getLoadBytes());
+                    } else {
+                        metricData.invoke(respContent.getNumberLoadedRows(), 
respContent.getLoadBytes());
+                    }
                 }
             } catch (Exception e) {
                 LOG.warn("metricData invoke get err:", e);

Reply via email to