Copilot commented on code in PR #10273:
URL: https://github.com/apache/seatunnel/pull/10273#discussion_r2670699484
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
if (metricNode == null) {
return;
}
+ java.util.List<String> sinkIdentifiers =
tableToSinkIdentifiersMap.get(tableName);
+
+ if (sinkIdentifiers != null
+ && !sinkIdentifiers.isEmpty()
+ && metricNode.isArray()
+ && sinkIdentifiers.size() > 1) {
+ int arraySize = metricNode.size();
+
+ if (arraySize == sinkIdentifiers.size()) {
+ ObjectMapper mapper = new ObjectMapper();
Review Comment:
A new ObjectMapper instance is created inside the loop at line 424, which is
inefficient. ObjectMapper instantiation is relatively expensive and should be
reused. Consider creating the ObjectMapper instance once before the loop (at
the method level or as a static/instance field) to improve performance,
especially when processing multiple sink identifiers.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -248,11 +256,40 @@ protected JsonObject getJobInfoJson(
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
- .add(RestConstant.METRICS,
metricsToJsonObject(getJobMetrics(jobMetrics)));
+ .add(
+ RestConstant.METRICS,
+ metricsToJsonObject(getJobMetrics(jobMetrics,
jobDAGInfo)));
}
- private Map<String, Object> getJobMetrics(String jobMetrics) {
+ private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo
jobDAGInfo) {
Map<String, Object> metricsMap = new HashMap<>();
+
+ Map<String, List<String>> tableToSourceIdentifiersMap = new
HashMap<>();
+ Map<String, List<String>> tableToSinkIdentifiersMap = new HashMap<>();
+ if (jobDAGInfo != null && jobDAGInfo.getVertexInfoMap() != null) {
+ for (VertexInfo vertexInfo :
jobDAGInfo.getVertexInfoMap().values()) {
+ String identifier =
extractSinkIdentifier(vertexInfo.getConnectorType());
Review Comment:
The condition at line 273 lacks a clarifying comment. The check
'identifier.equals(vertexInfo.getConnectorType())' is used to detect when the
extractSinkIdentifier method failed to extract a pattern (meaning it returned
the original input unchanged). This logic is not immediately obvious to
readers. Consider adding a comment explaining that this condition filters out
vertices where no identifier pattern was found, or refactor
extractSinkIdentifier to return an Optional to make the intent clearer.
```suggestion
String identifier =
extractSinkIdentifier(vertexInfo.getConnectorType());
// When extractSinkIdentifier cannot find a pattern, it
returns the original
// connectorType. In that case (identifier equals
connectorType) we skip this
// vertex because no specific identifier was extracted for
metrics grouping.
```
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java:
##########
@@ -46,15 +46,15 @@ public JobConfigParser(
this.isStartWithSavePoint = isStartWithSavePoint;
}
- static String createSourceActionName(int configIndex, String pluginName) {
+ public static String createSourceActionName(int configIndex, String
pluginName) {
return String.format("Source[%s]-%s", configIndex, pluginName);
}
- static String createSinkActionName(int configIndex, String pluginName,
String table) {
+ public static String createSinkActionName(int configIndex, String
pluginName, String table) {
return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
}
- static String createTransformActionName(int configIndex, String
pluginName) {
+ public static String createTransformActionName(int configIndex, String
pluginName) {
return String.format("Transform[%s]-%s", configIndex, pluginName);
Review Comment:
The methods createSourceActionName, createSinkActionName, and
createTransformActionName are changed from package-private to public
visibility. While this change enables their use in tests, there is no
documentation explaining their purpose, parameters, or expected usage. Consider
adding JavaDoc comments to these newly public methods to help external callers
understand how to use them correctly, especially since they are now part of the
public API of the class.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -320,8 +357,22 @@ private Map<String, Object> getJobMetrics(String
jobMetrics) {
String tableName =
TablePath.of(metricName.split("#")[1]).getFullName();
JsonNode metricNode =
jobMetricsStr.get(metricName);
+
+ Map<String, java.util.List<String>>
identifiersMap = null;
+ if (metricName.startsWith("TableSource")
+ ||
metricName.startsWith("Source")) {
+ identifiersMap =
tableToSourceIdentifiersMap;
+ } else if
(metricName.startsWith("TableSink")
+ || metricName.startsWith("Sink")) {
+ identifiersMap =
tableToSinkIdentifiersMap;
+ }
+
processMetric(
- metricName, tableName, metricNode,
tableMetricsMaps);
+ metricName,
+ tableName,
+ metricNode,
+ tableMetricsMaps,
+ identifiersMap);
Review Comment:
Potential null pointer issue: When identifiersMap is null (which happens
when a metric name contains "#" but doesn't start with TableSource, Source,
TableSink, or Sink), the method processMetric is still called with a null map
at line 375. Inside processMetric at line 415, calling .get(tableName) on a
null map will throw a NullPointerException. While current metric names appear
to always match the expected prefixes, this creates a fragile dependency.
Consider adding a null check at line 415 (e.g., 'if (tableToSinkIdentifiersMap
!= null)') or validating that identifiersMap is not null before calling
processMetric to make the code more robust.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
if (metricNode == null) {
return;
}
+ java.util.List<String> sinkIdentifiers =
tableToSinkIdentifiersMap.get(tableName);
+
+ if (sinkIdentifiers != null
+ && !sinkIdentifiers.isEmpty()
+ && metricNode.isArray()
+ && sinkIdentifiers.size() > 1) {
+ int arraySize = metricNode.size();
+
+ if (arraySize == sinkIdentifiers.size()) {
+ ObjectMapper mapper = new ObjectMapper();
+ for (int i = 0; i < arraySize; i++) {
+ String sinkIdentifier = sinkIdentifiers.get(i);
Review Comment:
The variable 'sinkIdentifier' at line 426 is misleadingly named. This
variable holds an identifier that could be for either a source or a sink,
depending on the context. The name should reflect this dual purpose. Consider
renaming it to 'identifier' to avoid confusion.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -374,30 +466,43 @@ private void processMetric(
SINK_BYTES_SEC_IDX = 10,
SINK_COMMITTED_BYTES_SEC_IDX = 11;
if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
- tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SOURCE_COUNT_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
- tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SINK_COUNT_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_COMMITTED_COUNT + "#")) {
- tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(tableName,
metricNode);
+ tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(metricKey,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
- tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SOURCE_BYTES_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
- tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SINK_BYTES_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_COMMITTED_BYTES + "#")) {
- tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(tableName,
metricNode);
+ tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(metricKey,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
- tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SOURCE_QPS_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
- tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SINK_QPS_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_COMMITTED_QPS + "#")) {
- tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(tableName,
metricNode);
+ tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(metricKey,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS +
"#")) {
- tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
- tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+ tableMetricsMaps[SINK_BYTES_SEC_IDX].put(metricKey, metricNode);
} else if (metricName.startsWith(SINK_COMMITTED_BYTES_PER_SECONDS +
"#")) {
- tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(tableName,
metricNode);
+ tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(metricKey,
metricNode);
+ }
+ }
+
+ private String extractSinkIdentifier(String vertexName) {
Review Comment:
The method name 'extractSinkIdentifier' is misleading because it can extract
identifiers for Source, Sink, or Transform plugins, not just sinks. The regex
pattern on line 500 explicitly matches all three types. Consider renaming the
method to 'extractPluginIdentifier' or 'extractVertexIdentifier' to accurately
reflect its broader purpose.
```suggestion
private String extractPluginIdentifier(String vertexName) {
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -248,11 +256,40 @@ protected JsonObject getJobInfoJson(
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
- .add(RestConstant.METRICS,
metricsToJsonObject(getJobMetrics(jobMetrics)));
+ .add(
+ RestConstant.METRICS,
+ metricsToJsonObject(getJobMetrics(jobMetrics,
jobDAGInfo)));
}
- private Map<String, Object> getJobMetrics(String jobMetrics) {
+ private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo
jobDAGInfo) {
Map<String, Object> metricsMap = new HashMap<>();
+
+ Map<String, List<String>> tableToSourceIdentifiersMap = new
HashMap<>();
+ Map<String, List<String>> tableToSinkIdentifiersMap = new HashMap<>();
+ if (jobDAGInfo != null && jobDAGInfo.getVertexInfoMap() != null) {
+ for (VertexInfo vertexInfo :
jobDAGInfo.getVertexInfoMap().values()) {
+ String identifier =
extractSinkIdentifier(vertexInfo.getConnectorType());
+ if (vertexInfo.getTablePaths() == null
+ || identifier.equals(vertexInfo.getConnectorType())) {
+ continue;
+ }
+ Map<String, List<String>> targetMap = null;
+ if (vertexInfo.getType() == PluginType.SOURCE) {
+ targetMap = tableToSourceIdentifiersMap;
+ } else if (vertexInfo.getType() == PluginType.SINK) {
+ targetMap = tableToSinkIdentifiersMap;
Review Comment:
The code at lines 276-281 only handles SOURCE and SINK plugin types,
ignoring TRANSFORM types. However, the extractSinkIdentifier method on line 271
explicitly extracts Transform identifiers (as shown in the regex pattern on
line 500). This inconsistency could lead to confusion. If Transform metrics are
not expected to be processed in this context, consider updating the regex
pattern to exclude Transform. If Transform metrics should be handled, add a
case for PluginType.TRANSFORM.
```suggestion
targetMap = tableToSinkIdentifiersMap;
} else if (vertexInfo.getType() == PluginType.TRANSFORM) {
// Currently, transform plugin metrics are not
aggregated by table
// in this method. We still call extractSinkIdentifier
for all
// plugin types, but intentionally do not populate a
target map
// for TRANSFORM here.
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
Review Comment:
The parameter name 'tableToSinkIdentifiersMap' is misleading in the
processMetric method. This parameter can receive either
tableToSourceIdentifiersMap or tableToSinkIdentifiersMap depending on the
metric type (as seen in lines 361-368), but the parameter name suggests it only
handles sink identifiers. This makes the code harder to understand and
maintain. Consider renaming it to 'tableToIdentifiersMap' or 'identifiersMap'
to better reflect its dual purpose.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
if (metricNode == null) {
return;
}
+ java.util.List<String> sinkIdentifiers =
tableToSinkIdentifiersMap.get(tableName);
Review Comment:
The variable 'sinkIdentifiers' at line 415 is misleadingly named. This
variable can contain either source or sink identifiers depending on which map
is passed in (tableToSourceIdentifiersMap or tableToSinkIdentifiersMap). Using
the name 'sinkIdentifiers' when it might actually contain source identifiers
makes the code confusing. Consider renaming it to 'identifiers' to accurately
reflect what it contains.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
if (metricNode == null) {
return;
}
+ java.util.List<String> sinkIdentifiers =
tableToSinkIdentifiersMap.get(tableName);
+
+ if (sinkIdentifiers != null
+ && !sinkIdentifiers.isEmpty()
+ && metricNode.isArray()
+ && sinkIdentifiers.size() > 1) {
+ int arraySize = metricNode.size();
+
+ if (arraySize == sinkIdentifiers.size()) {
+ ObjectMapper mapper = new ObjectMapper();
+ for (int i = 0; i < arraySize; i++) {
+ String sinkIdentifier = sinkIdentifiers.get(i);
+ String metricKey = sinkIdentifier + "." + tableName;
+
+ try {
+ String json = "[" +
mapper.writeValueAsString(metricNode.get(i)) + "]";
+ JsonNode arrayNode = mapper.readTree(json);
Review Comment:
The JSON serialization and deserialization at lines 430-431 appears
inefficient. The code converts a single JsonNode element to a JSON string,
wraps it in brackets to create an array string, and then parses it back to a
JsonNode array. This round-trip conversion is unnecessary overhead. Consider
creating the array node directly using Jackson's API (e.g.,
mapper.createArrayNode().add(metricNode.get(i))) instead of string manipulation
and re-parsing.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -355,11 +406,52 @@ private void processMetric(
String metricName,
String tableName,
JsonNode metricNode,
- Map<String, JsonNode>[] tableMetricsMaps) {
+ Map<String, JsonNode>[] tableMetricsMaps,
+ Map<String, java.util.List<String>> tableToSinkIdentifiersMap) {
if (metricNode == null) {
return;
}
+ java.util.List<String> sinkIdentifiers =
tableToSinkIdentifiersMap.get(tableName);
+
+ if (sinkIdentifiers != null
+ && !sinkIdentifiers.isEmpty()
+ && metricNode.isArray()
+ && sinkIdentifiers.size() > 1) {
+ int arraySize = metricNode.size();
+
+ if (arraySize == sinkIdentifiers.size()) {
+ ObjectMapper mapper = new ObjectMapper();
+ for (int i = 0; i < arraySize; i++) {
+ String sinkIdentifier = sinkIdentifiers.get(i);
+ String metricKey = sinkIdentifier + "." + tableName;
+
+ try {
+ String json = "[" +
mapper.writeValueAsString(metricNode.get(i)) + "]";
+ JsonNode arrayNode = mapper.readTree(json);
+ putMetricToMap(metricName, metricKey, arrayNode,
tableMetricsMaps);
+ } catch (JsonProcessingException e) {
+ putMetricToMap(metricName, metricKey,
metricNode.get(i), tableMetricsMaps);
+ }
+ }
+ return;
+ }
+ }
+
+ String metricKey = tableName;
+ if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
+ metricKey = sinkIdentifiers.get(0) + "." + tableName;
Review Comment:
The condition at line 420 checks if sinkIdentifiers.size() is greater than
1, but the subsequent check at line 423 verifies if arraySize equals
sinkIdentifiers.size(). This means when there's only one identifier
(sinkIdentifiers.size() == 1), the code skips to line 441 and uses
sinkIdentifiers.get(0). However, when sinkIdentifiers.size() > 1 but arraySize
!= sinkIdentifiers.size(), the code also falls through to line 441 and only
uses the first identifier, potentially losing data. Consider adding a warning
log or handling this mismatch case more explicitly to alert developers when the
array size doesn't match the expected number of identifiers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]