davidzollo commented on PR #10273:
URL: https://github.com/apache/seatunnel/pull/10273#issuecomment-3799555296
Good job. Here is the result from Claudcode that you can refer to.
### Issue 1: Breaking Change Not Documented
**Problem**: This PR introduces a breaking change to the metrics API format
but does not document it in `incompatible-changes.md`.
**Impact**:
- All existing monitoring dashboards (Grafana) will break
- Prometheus alerting rules will fail
- Custom monitoring integrations will need updates
- Users upgrading will experience silent monitoring failures
**Required Actions**:
1. Update `docs/zh/concept/incompatible-changes.md` (Chinese)
2. Update `docs/en/concept/incompatible-changes.md` (English)
3. Provide migration guide with before/after examples
4. Update PR description with clear breaking change warning
**Suggested Documentation Content**:
```markdown
## v2.x.x
### Breaking Change: Metrics API Format Change
**Affected Version**: 2.x.x+
**Affected Component**: REST API, Metrics System
**Description**:
To support multiple sinks/sources processing the same table, metric key
format
has been changed from `{tableName}` to `{VertexIdentifier}.{tableName}`.
**Before**:
```json
{
"TableSinkWriteCount": {
"fake.user_table": "15"
}
}
```
**After**:
```json
{
"TableSinkWriteCount": {
"Sink[0].fake.user_table": "10",
"Sink[1].fake.user_table": "5"
}
}
```
**Migration Guide**:
1. Update Grafana dashboard queries to use new metric key format
2. Update Prometheus alerting rules
3. If compatibility needed, add configuration:
`metrics.use-legacy-format=true`
```
**Files to Update**:
- `docs/zh/concept/incompatible-changes.md`
- `docs/en/concept/incompatible-changes.md`
- PR description
---
### Issue 2: Array Size Mismatch Handling is Flawed **HIGH**
**Location**:
[BaseService.java:417-446](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java#L417-L446)
**Problem**: When metric array size doesn't match the number of sinks, the
code blindly uses the first sink identifier, which can lead to incorrect metric
attribution.
**Current Code**:
```java
if (arraySize == sinkIdentifiers.size()) {
// Handle matched case
// ...
return;
}
// If sizes don't match, blindly use first identifier
String metricKey = tableName;
if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
metricKey = sinkIdentifiers.get(0) + "." + tableName; // ❌ Wrong!
}
```
**Why This is Critical**:
- **Scenario**: 2 sinks configured, but only Sink[1] has reported metrics yet
- **Current Behavior**: Labels the metric as `Sink[0].table` (WRONG)
- **Actual**: The metric is from Sink[1]
- **Impact**: Monitoring data is incorrect, misleading operations team
**Common Scenarios for Size Mismatch**:
1. Sink startup delay (very common in production)
2. Sink failure and restart
3. Network latency causing delayed metric reporting
4. Different parallelism across sinks
**Suggested Fix**:
```java
if (sinkIdentifiers != null
&& !sinkIdentifiers.isEmpty()
&& metricNode.isArray()
&& sinkIdentifiers.size() > 1) {
int arraySize = metricNode.size();
if (arraySize == sinkIdentifiers.size()) {
// Perfect match: assign by index
ObjectMapper mapper = new ObjectMapper();
for (int i = 0; i < arraySize; i++) {
String sinkIdentifier = sinkIdentifiers.get(i);
String metricKey = sinkIdentifier + "." + tableName;
try {
String json = "[" +
mapper.writeValueAsString(metricNode.get(i)) + "]";
JsonNode arrayNode = mapper.readTree(json);
putMetricToMap(metricName, metricKey, arrayNode,
tableMetricsMaps);
} catch (JsonProcessingException e) {
putMetricToMap(metricName, metricKey, metricNode.get(i),
tableMetricsMaps);
}
}
return;
} else if (arraySize > 0 && arraySize < sinkIdentifiers.size()) {
// Partial match: log warning and assign by index for available
metrics
log.warn("Metric array size mismatch for table {}: expected {}
sinks, got {} metrics. "
+ "Some sinks may not be reporting metrics yet. This could
indicate: "
+ "1) Sink startup delay, 2) Sink failure, 3) Network
latency",
tableName, sinkIdentifiers.size(), arraySize);
// Assign available metrics by index position
for (int i = 0; i < arraySize; i++) {
String sinkIdentifier = sinkIdentifiers.get(i);
String metricKey = sinkIdentifier + "." + tableName;
try {
String json = "[" +
mapper.writeValueAsString(metricNode.get(i)) + "]";
JsonNode arrayNode = mapper.readTree(json);
putMetricToMap(metricName, metricKey, arrayNode,
tableMetricsMaps);
} catch (JsonProcessingException e) {
putMetricToMap(metricName, metricKey, metricNode.get(i),
tableMetricsMaps);
}
}
return;
} else if (arraySize > sinkIdentifiers.size()) {
// More metrics than expected sinks - serious configuration issue
log.error("Invalid metric array size for table {}: received {}
metrics but only {} sinks configured. "
+ "This indicates a serious configuration or collection
error.",
tableName, arraySize, sinkIdentifiers.size());
// Fall through to default handling
}
}
// Default/fallback handling for single value or unmatched scenarios
String metricKey = tableName;
if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
if (metricNode.isArray() && metricNode.size() == 1) {
log.debug("Single metric value for table {} with {} configured
sinks, assigning to first sink {}",
tableName, sinkIdentifiers.size(), sinkIdentifiers.get(0));
metricKey = sinkIdentifiers.get(0) + "." + tableName;
} else {
log.warn("Cannot reliably determine sink assignment for table {}
metric (isArray={}, size={}), "
+ "using table name only to avoid incorrect attribution",
tableName, metricNode.isArray(),
metricNode.isArray() ? metricNode.size() : "N/A");
}
}
putMetricToMap(metricName, metricKey, metricNode, tableMetricsMaps);
```
**Key Improvements**:
1. Handle partial match scenario (some sinks not reporting yet)
2. Add comprehensive logging for debugging
3. Explicitly handle array size > sink count (configuration error)
4. Fall back to table name when assignment is unreliable
---
### Issue #3: JSON Exception Silently Drops All Metrics **HIGH**
**Location**:
[BaseService.java:349-388](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java#L349-L388)
**Problem**: When JSON parsing fails, the method returns an empty metrics
map without any logging, causing complete silent failure of metrics collection.
**Current Code**:
```java
try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
jobMetricsStr.fieldNames().forEachRemaining(metricName -> {
// Process metrics
});
aggregateMetrics(...);
} catch (JsonProcessingException e) {
return metricsMap; // ❌ Returns empty map, no logging!
}
```
**Impact**:
- Complete loss of monitoring visibility when JSON parsing fails
- No way to diagnose the issue without logs
- Production job appears to have no metrics (misleading)
**Suggested Fix**:
```java
try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
jobMetricsStr.fieldNames().forEachRemaining(metricName -> {
try {
if (metricName.contains("#")) {
String tableName =
TablePath.of(metricName.split("#")[1]).getFullName();
JsonNode metricNode = jobMetricsStr.get(metricName);
Map<String, java.util.List<String>> identifiersMap = null;
if (metricName.startsWith("TableSource")
|| metricName.startsWith("Source")) {
identifiersMap = tableToSourceIdentifiersMap;
} else if (metricName.startsWith("TableSink")
|| metricName.startsWith("Sink")) {
identifiersMap = tableToSinkIdentifiersMap;
}
processMetric(
metricName,
tableName,
metricNode,
tableMetricsMaps,
identifiersMap);
}
} catch (Exception e) {
// Don't let one metric failure kill all metrics
log.error("Failed to process individual metric '{}': {}.
Continuing with other metrics.",
metricName, e.getMessage(), e);
}
});
aggregateMetrics(...);
} catch (JsonProcessingException e) {
log.error("Failed to parse job metrics JSON: {}. Raw input (first 500
chars): {}",
e.getMessage(),
jobMetrics != null && jobMetrics.length() > 500
? jobMetrics.substring(0, 500) + "..."
: jobMetrics,
e);
return metricsMap;
} catch (Exception e) {
log.error("Unexpected error while processing job metrics: {}",
e.getMessage(), e);
return metricsMap;
}
```
**Key Improvements**:
1. Log detailed error with raw JSON sample for debugging
2. Wrap individual metric processing to prevent one failure from killing all
metrics
3. Add catch for unexpected exceptions
4. Truncate long JSON in logs to avoid log explosion
---
## 🟡 Important Issues (Strongly Recommended)
### Issue #4: Method Naming is Inaccurate 🟡 **MEDIUM**
**Location**:
[BaseService.java:495-506](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java#L495-L506)
**Problem**: Method named `extractSinkIdentifier` but actually extracts
Source, Sink, AND Transform identifiers.
**Current Code**:
```java
private String extractSinkIdentifier(String vertexName) {
if (vertexName == null) {
return "";
}
Pattern pattern =
Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");
Matcher matcher = pattern.matcher(vertexName);
if (matcher.find()) {
return matcher.group(1);
}
return vertexName;
}
```
**Suggested Fix**:
```java
/**
* Extracts the vertex identifier (Source[n], Sink[n], or Transform[n]) from
a vertex name.
* <p>
* This method is used to distinguish metrics from multiple vertices
processing the same table.
* For example, if two sinks write to "user_table", their metrics are
distinguished as
* "Sink[0].user_table" and "Sink[1].user_table".
* </p>
*
* @param vertexName the vertex name from JobDAGInfo, typically in format
* "pipeline-1 [Sink[0]-console-MultiTableSink]"
* @return the extracted identifier (e.g., "Sink[0]"), or original name if
no match found
*/
private String extractVertexIdentifier(String vertexName) {
if (vertexName == null || vertexName.isEmpty()) {
log.debug("Null or empty vertex name provided");
return "";
}
Matcher matcher = VERTEX_IDENTIFIER_PATTERN.matcher(vertexName);
if (matcher.find()) {
String identifier = matcher.group(1);
log.trace("Extracted vertex identifier '{}' from '{}'", identifier,
vertexName);
return identifier;
}
log.debug("Failed to extract vertex identifier from '{}', using original
name", vertexName);
return vertexName;
}
```
**Also Update**:
- Rename all call sites: `extractSinkIdentifier` → `extractVertexIdentifier`
- Update variable names for clarity
---
### Issue #5: Regex Pattern Not Cached 🟡 **MEDIUM** (Performance)
**Location**:
[BaseService.java:500](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java#L500)
**Problem**: `Pattern.compile()` is called on every invocation, which is
expensive.
**Current Code**:
```java
Pattern pattern = Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");
```
**Impact**:
- In large DAGs (100+ vertices), this adds noticeable overhead
- Pattern compilation is expensive (regex parsing + optimization)
- Called once per vertex per metrics query
**Suggested Fix**:
```java
// Add as class-level constant
private static final Pattern VERTEX_IDENTIFIER_PATTERN =
Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");
private String extractVertexIdentifier(String vertexName) {
if (vertexName == null || vertexName.isEmpty()) {
return "";
}
Matcher matcher = VERTEX_IDENTIFIER_PATTERN.matcher(vertexName);
if (matcher.find()) {
return matcher.group(1);
}
return vertexName;
}
```
---
### Issue #6: Missing Critical Logging 🟡 **MEDIUM**
**Problem**: Key decision points have no logging, making production issues
difficult to diagnose.
**Missing Logs**:
1. When identifier extraction fails
2. When array size doesn't match sink count
3. When metrics are assigned to specific sinks
4. When falling back to table name only
**Suggested Additions**: (Already included in Issue #2 fix above)
---
### Issue #7: Missing Edge Case Tests 🟡 **MEDIUM**
**Problem**: Test coverage doesn't include critical edge cases that will
occur in production.
**Missing Test Scenarios**:
```java
@Test
public void testMetricsWithArraySizeMismatch_FewerMetricsThanSinks() throws
Exception {
// Scenario: 2 sinks configured, but only 1 has reported metrics (common
during startup)
String jobMetrics =
"{"
+ "\"SinkWriteCount#fake.user_table\": [{\"value\":
100}]," // Only 1 metric
+ "\"SinkWriteCount\": [{\"value\": 100}]"
+ "}";
JobDAGInfo dagInfo = createDAGInfoWithMultipleSinks(); // 2 sinks
Map<String, Object> result =
(Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService,
jobMetrics, dagInfo);
Map<String, Object> tableSinkCount = (Map<String, Object>)
result.get("TableSinkWriteCount");
// Should have exactly 1 entry for Sink[0]
Assertions.assertEquals(1, tableSinkCount.size());
Assertions.assertTrue(tableSinkCount.containsKey("Sink[0].fake.user_table"));
Assertions.assertEquals(100L,
tableSinkCount.get("Sink[0].fake.user_table"));
// Should NOT have entry for Sink[1] (not reporting yet)
Assertions.assertFalse(tableSinkCount.containsKey("Sink[1].fake.user_table"));
}
@Test
public void testMetricsWithMalformedJSON() throws Exception {
// Scenario: Invalid JSON from metrics collection
String malformedMetrics = "{\"SinkWriteCount#table\": [invalid}";
JobDAGInfo dagInfo = createDAGInfoWithMultipleSinks();
Map<String, Object> result =
(Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService,
malformedMetrics, dagInfo);
// Should return empty map, but not crash
Assertions.assertNotNull(result);
// Verify error was logged (would require log capture framework)
}
@Test
public void testMetricsWithNullJobDAGInfo() throws Exception {
// Scenario: Job just started, DAG info not available yet
String jobMetrics =
"{"
+ "\"SinkWriteCount#fake.user_table\": [{\"value\":
100}],"
+ "\"SinkWriteCount\": [{\"value\": 100}]"
+ "}";
Map<String, Object> result =
(Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService,
jobMetrics, null);
Map<String, Object> tableSinkCount = (Map<String, Object>)
result.get("TableSinkWriteCount");
// Should fall back to table name only
Assertions.assertTrue(tableSinkCount.containsKey("fake.user_table"));
}
@Test
public void testMetricsWithEmptyVertexName() throws Exception {
// Scenario: Malformed vertex info with empty connector type
// ... create DAG with empty vertex name ...
}
@Test
public void testMetricsPerformanceWithLargeDAG() throws Exception {
// Scenario: Large-scale production job with 100+ vertices
JobDAGInfo largeDAG = createLargeDAG(100, 1000); // 100 vertices, 1000
tables
String jobMetrics = createMockMetrics(1000); // Large metrics JSON
long startTime = System.currentTimeMillis();
Map<String, Object> result =
(Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService,
jobMetrics, largeDAG);
long duration = System.currentTimeMillis() - startTime;
Assertions.assertTrue(duration < 200,
"Metrics processing should complete within 200ms, but took " +
duration + "ms");
}
```
---
## Optional Improvements (Quality Enhancement)
### Issue #8: Missing Javadoc Comments **LOW**
**Problem**: Core methods lack proper Javadoc documentation.
**Methods Missing Javadoc**:
- `getJobMetrics`
- `processMetric`
- `putMetricToMap`
- `extractSinkIdentifier` / `extractVertexIdentifier`
**Example**:
```java
/**
* Retrieves and processes job metrics, adding vertex identifiers for
multi-sink scenarios.
* <p>
* When multiple sinks process the same table, this method distinguishes
their metrics by
* prepending the vertex identifier (e.g., "Sink[0].table", "Sink[1].table").
* </p>
*
* @param jobMetrics raw metrics JSON string from the job
* @param jobDAGInfo DAG information containing vertex details (can be null
for backward compatibility)
* @return map of processed metrics with proper identifiers
*/
private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo
jobDAGInfo) {
// ...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]