Copilot commented on code in PR #10273:
URL: https://github.com/apache/seatunnel/pull/10273#discussion_r2670699484


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
         if (metricNode == null) {
             return;
         }
 
+        java.util.List<String> sinkIdentifiers = 
tableToSinkIdentifiersMap.get(tableName);
+
+        if (sinkIdentifiers != null
+                && !sinkIdentifiers.isEmpty()
+                && metricNode.isArray()
+                && sinkIdentifiers.size() > 1) {
+            int arraySize = metricNode.size();
+
+            if (arraySize == sinkIdentifiers.size()) {
+                ObjectMapper mapper = new ObjectMapper();

Review Comment:
   A new ObjectMapper instance is created inside the loop at line 424, which is 
inefficient. ObjectMapper instantiation is relatively expensive and should be 
reused. Consider creating the ObjectMapper instance once before the loop (at 
the method level or as a static/instance field) to improve performance, 
especially when processing multiple sink identifiers.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -248,11 +256,40 @@ protected JsonObject getJobInfoJson(
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
-                .add(RestConstant.METRICS, 
metricsToJsonObject(getJobMetrics(jobMetrics)));
+                .add(
+                        RestConstant.METRICS,
+                        metricsToJsonObject(getJobMetrics(jobMetrics, 
jobDAGInfo)));
     }
 
-    private Map<String, Object> getJobMetrics(String jobMetrics) {
+    private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo 
jobDAGInfo) {
         Map<String, Object> metricsMap = new HashMap<>();
+
+        Map<String, List<String>> tableToSourceIdentifiersMap = new 
HashMap<>();
+        Map<String, List<String>> tableToSinkIdentifiersMap = new HashMap<>();
+        if (jobDAGInfo != null && jobDAGInfo.getVertexInfoMap() != null) {
+            for (VertexInfo vertexInfo : 
jobDAGInfo.getVertexInfoMap().values()) {
+                String identifier = 
extractSinkIdentifier(vertexInfo.getConnectorType());

Review Comment:
   The condition at line 273 lacks a clarifying comment. The check 
'identifier.equals(vertexInfo.getConnectorType())' is used to detect when the 
extractSinkIdentifier method failed to extract a pattern (meaning it returned 
the original input unchanged). This logic is not immediately obvious to 
readers. Consider adding a comment explaining that this condition filters out 
vertices where no identifier pattern was found, or refactor 
extractSinkIdentifier to return an Optional to make the intent clearer.
   ```suggestion
                   String identifier = 
extractSinkIdentifier(vertexInfo.getConnectorType());
                   // When extractSinkIdentifier cannot find a pattern, it 
returns the original
                   // connectorType. In that case (identifier equals 
connectorType) we skip this
                   // vertex because no specific identifier was extracted for 
metrics grouping.
   ```



##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java:
##########
@@ -46,15 +46,15 @@ public JobConfigParser(
         this.isStartWithSavePoint = isStartWithSavePoint;
     }
 
-    static String createSourceActionName(int configIndex, String pluginName) {
+    public static String createSourceActionName(int configIndex, String 
pluginName) {
         return String.format("Source[%s]-%s", configIndex, pluginName);
     }
 
-    static String createSinkActionName(int configIndex, String pluginName, 
String table) {
+    public static String createSinkActionName(int configIndex, String 
pluginName, String table) {
         return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
     }
 
-    static String createTransformActionName(int configIndex, String 
pluginName) {
+    public static String createTransformActionName(int configIndex, String 
pluginName) {
         return String.format("Transform[%s]-%s", configIndex, pluginName);

Review Comment:
   The methods createSourceActionName, createSinkActionName, and 
createTransformActionName are changed from package-private to public 
visibility. While this change enables their use in tests, there is no 
documentation explaining their purpose, parameters, or expected usage. Consider 
adding JavaDoc comments to these newly public methods to help external callers 
understand how to use them correctly, especially since they are now part of the 
public API of the class.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -320,8 +357,22 @@ private Map<String, Object> getJobMetrics(String 
jobMetrics) {
                                     String tableName =
                                             
TablePath.of(metricName.split("#")[1]).getFullName();
                                     JsonNode metricNode = 
jobMetricsStr.get(metricName);
+
+                                    Map<String, java.util.List<String>> 
identifiersMap = null;
+                                    if (metricName.startsWith("TableSource")
+                                            || 
metricName.startsWith("Source")) {
+                                        identifiersMap = 
tableToSourceIdentifiersMap;
+                                    } else if 
(metricName.startsWith("TableSink")
+                                            || metricName.startsWith("Sink")) {
+                                        identifiersMap = 
tableToSinkIdentifiersMap;
+                                    }
+
                                     processMetric(
-                                            metricName, tableName, metricNode, 
tableMetricsMaps);
+                                            metricName,
+                                            tableName,
+                                            metricNode,
+                                            tableMetricsMaps,
+                                            identifiersMap);

Review Comment:
   Potential null pointer issue: When identifiersMap is null (which happens 
when a metric name contains "#" but doesn't start with TableSource, Source, 
TableSink, or Sink), the method processMetric is still called with a null map 
at line 375. Inside processMetric at line 415, calling .get(tableName) on a 
null map will throw a NullPointerException. While current metric names appear 
to always match the expected prefixes, this creates a fragile dependency. 
Consider adding a null check at line 415 (e.g., 'if (tableToSinkIdentifiersMap 
!= null)') or validating that identifiersMap is not null before calling 
processMetric to make the code more robust.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
         if (metricNode == null) {
             return;
         }
 
+        java.util.List<String> sinkIdentifiers = 
tableToSinkIdentifiersMap.get(tableName);
+
+        if (sinkIdentifiers != null
+                && !sinkIdentifiers.isEmpty()
+                && metricNode.isArray()
+                && sinkIdentifiers.size() > 1) {
+            int arraySize = metricNode.size();
+
+            if (arraySize == sinkIdentifiers.size()) {
+                ObjectMapper mapper = new ObjectMapper();
+                for (int i = 0; i < arraySize; i++) {
+                    String sinkIdentifier = sinkIdentifiers.get(i);

Review Comment:
   The variable 'sinkIdentifier' at line 426 is misleadingly named. This 
variable holds an identifier that could be for either a source or a sink, 
depending on the context. The name should reflect this dual purpose. Consider 
renaming it to 'identifier' to avoid confusion.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -374,30 +466,43 @@ private void processMetric(
                 SINK_BYTES_SEC_IDX = 10,
                 SINK_COMMITTED_BYTES_SEC_IDX = 11;
         if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
-            tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SOURCE_COUNT_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
-            tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SINK_COUNT_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_COMMITTED_COUNT + "#")) {
-            tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(tableName, 
metricNode);
+            tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(metricKey, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
-            tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SOURCE_BYTES_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
-            tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SINK_BYTES_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_COMMITTED_BYTES + "#")) {
-            tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(tableName, 
metricNode);
+            tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(metricKey, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
-            tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SOURCE_QPS_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
-            tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SINK_QPS_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_COMMITTED_QPS + "#")) {
-            tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(tableName, 
metricNode);
+            tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(metricKey, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS + 
"#")) {
-            tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
-            tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+            tableMetricsMaps[SINK_BYTES_SEC_IDX].put(metricKey, metricNode);
         } else if (metricName.startsWith(SINK_COMMITTED_BYTES_PER_SECONDS + 
"#")) {
-            tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(tableName, 
metricNode);
+            tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(metricKey, 
metricNode);
+        }
+    }
+
+    private String extractSinkIdentifier(String vertexName) {

Review Comment:
   The method name 'extractSinkIdentifier' is misleading because it can extract 
identifiers for Source, Sink, or Transform plugins, not just sinks. The regex 
pattern on line 500 explicitly matches all three types. Consider renaming the 
method to 'extractPluginIdentifier' or 'extractVertexIdentifier' to accurately 
reflect its broader purpose.
   ```suggestion
       private String extractPluginIdentifier(String vertexName) {
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -248,11 +256,40 @@ protected JsonObject getJobInfoJson(
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
-                .add(RestConstant.METRICS, 
metricsToJsonObject(getJobMetrics(jobMetrics)));
+                .add(
+                        RestConstant.METRICS,
+                        metricsToJsonObject(getJobMetrics(jobMetrics, 
jobDAGInfo)));
     }
 
-    private Map<String, Object> getJobMetrics(String jobMetrics) {
+    private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo 
jobDAGInfo) {
         Map<String, Object> metricsMap = new HashMap<>();
+
+        Map<String, List<String>> tableToSourceIdentifiersMap = new 
HashMap<>();
+        Map<String, List<String>> tableToSinkIdentifiersMap = new HashMap<>();
+        if (jobDAGInfo != null && jobDAGInfo.getVertexInfoMap() != null) {
+            for (VertexInfo vertexInfo : 
jobDAGInfo.getVertexInfoMap().values()) {
+                String identifier = 
extractSinkIdentifier(vertexInfo.getConnectorType());
+                if (vertexInfo.getTablePaths() == null
+                        || identifier.equals(vertexInfo.getConnectorType())) {
+                    continue;
+                }
+                Map<String, List<String>> targetMap = null;
+                if (vertexInfo.getType() == PluginType.SOURCE) {
+                    targetMap = tableToSourceIdentifiersMap;
+                } else if (vertexInfo.getType() == PluginType.SINK) {
+                    targetMap = tableToSinkIdentifiersMap;

Review Comment:
   The code at lines 276-281 only handles SOURCE and SINK plugin types, 
ignoring TRANSFORM types. However, the extractSinkIdentifier method on line 271 
explicitly extracts Transform identifiers (as shown in the regex pattern on 
line 500). This inconsistency could lead to confusion. If Transform metrics are 
not expected to be processed in this context, consider updating the regex 
pattern to exclude Transform. If Transform metrics should be handled, add a 
case for PluginType.TRANSFORM.
   ```suggestion
                       targetMap = tableToSinkIdentifiersMap;
                   } else if (vertexInfo.getType() == PluginType.TRANSFORM) {
                       // Currently, transform plugin metrics are not 
aggregated by table
                       // in this method. We still call extractSinkIdentifier 
for all
                       // plugin types, but intentionally do not populate a 
target map
                       // for TRANSFORM here.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {

Review Comment:
   The parameter name 'tableToSinkIdentifiersMap' is misleading in the 
processMetric method. This parameter can receive either 
tableToSourceIdentifiersMap or tableToSinkIdentifiersMap depending on the 
metric type (as seen in lines 361-368), but the parameter name suggests it only 
handles sink identifiers. This makes the code harder to understand and 
maintain. Consider renaming it to 'tableToIdentifiersMap' or 'identifiersMap' 
to better reflect its dual purpose.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
         if (metricNode == null) {
             return;
         }
 
+        java.util.List<String> sinkIdentifiers = 
tableToSinkIdentifiersMap.get(tableName);

Review Comment:
   The variable 'sinkIdentifiers' at line 415 is misleadingly named. This 
variable can contain either source or sink identifiers depending on which map 
is passed in (tableToSourceIdentifiersMap or tableToSinkIdentifiersMap). Using 
the name 'sinkIdentifiers' when it might actually contain source identifiers 
makes the code confusing. Consider renaming it to 'identifiers' to accurately 
reflect what it contains.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
         if (metricNode == null) {
             return;
         }
 
+        java.util.List<String> sinkIdentifiers = 
tableToSinkIdentifiersMap.get(tableName);
+
+        if (sinkIdentifiers != null
+                && !sinkIdentifiers.isEmpty()
+                && metricNode.isArray()
+                && sinkIdentifiers.size() > 1) {
+            int arraySize = metricNode.size();
+
+            if (arraySize == sinkIdentifiers.size()) {
+                ObjectMapper mapper = new ObjectMapper();
+                for (int i = 0; i < arraySize; i++) {
+                    String sinkIdentifier = sinkIdentifiers.get(i);
+                    String metricKey = sinkIdentifier + "." + tableName;
+
+                    try {
+                        String json = "[" + 
mapper.writeValueAsString(metricNode.get(i)) + "]";
+                        JsonNode arrayNode = mapper.readTree(json);

Review Comment:
   The JSON serialization and deserialization at lines 430-431 appears 
inefficient. The code converts a single JsonNode element to a JSON string, 
wraps it in brackets to create an array string, and then parses it back to a 
JsonNode array. This round-trip conversion is unnecessary overhead. Consider 
creating the array node directly using Jackson's API (e.g., 
mapper.createArrayNode().add(metricNode.get(i))) instead of string manipulation 
and re-parsing.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
             String metricName,
             String tableName,
             JsonNode metricNode,
-            Map<String, JsonNode>[] tableMetricsMaps) {
+            Map<String, JsonNode>[] tableMetricsMaps,
+            Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
         if (metricNode == null) {
             return;
         }
 
+        java.util.List<String> sinkIdentifiers = 
tableToSinkIdentifiersMap.get(tableName);
+
+        if (sinkIdentifiers != null
+                && !sinkIdentifiers.isEmpty()
+                && metricNode.isArray()
+                && sinkIdentifiers.size() > 1) {
+            int arraySize = metricNode.size();
+
+            if (arraySize == sinkIdentifiers.size()) {
+                ObjectMapper mapper = new ObjectMapper();
+                for (int i = 0; i < arraySize; i++) {
+                    String sinkIdentifier = sinkIdentifiers.get(i);
+                    String metricKey = sinkIdentifier + "." + tableName;
+
+                    try {
+                        String json = "[" + 
mapper.writeValueAsString(metricNode.get(i)) + "]";
+                        JsonNode arrayNode = mapper.readTree(json);
+                        putMetricToMap(metricName, metricKey, arrayNode, 
tableMetricsMaps);
+                    } catch (JsonProcessingException e) {
+                        putMetricToMap(metricName, metricKey, 
metricNode.get(i), tableMetricsMaps);
+                    }
+                }
+                return;
+            }
+        }
+
+        String metricKey = tableName;
+        if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
+            metricKey = sinkIdentifiers.get(0) + "." + tableName;

Review Comment:
   The condition at line 420 checks if sinkIdentifiers.size() is greater than 
1, but the subsequent check at line 423 verifies if arraySize equals 
sinkIdentifiers.size(). This means when there's only one identifier 
(sinkIdentifiers.size() == 1), the code skips to line 441 and uses 
sinkIdentifiers.get(0). However, when sinkIdentifiers.size() > 1 but arraySize 
!= sinkIdentifiers.size(), the code also falls through to line 441 and only 
uses the first identifier, potentially losing data. Consider adding a warning 
log or handling this mismatch case more explicitly to alert developers when the 
array size doesn't match the expected number of identifiers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to