DanielCarter-stack commented on PR #10572:
URL: https://github.com/apache/seatunnel/pull/10572#issuecomment-4012066950
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10572", "part": 1,
"total": 1} -->
### Issue 1: Thread Safety in Metrics Collection
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/observability/RealtimeMetricsService.java:152-167`
**Related Context**:
- Caller:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:953-970`
- Concurrent Entry Points:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RealtimeMetricsServlet.java:55-61`
**Problem Description**:
In the `collectRealtimeMetrics` method, the `vertices` collection is
traversed to collect metrics. If the collection is modified by another thread
during traversal (e.g., task rescheduling), it may cause
`ConcurrentModificationException` or data inconsistency.
**Potential Risks**:
- Risk 1: Multiple HTTP requests simultaneously calling
`collectRealtimeMetrics`, leading to concurrent access
- Risk 2: Task state changes during metrics collection, causing data
inconsistency
**Scope of Impact**:
- Direct Impact: `RealtimeMetricsService.collectRealtimeMetrics()`
- Indirect Impact: All REST API endpoints that depend on this method
- Impact Area: Core Framework
**Severity**: MAJOR
**Improvement Suggestions**:
```java
public CompletableFuture<CollectResult>
collectRealtimeMetrics(Collection<PhysicalVertex> vertices) {
// Create defensive copy
List<PhysicalVertex> verticesCopy = new ArrayList<>(vertices);
Map<Long, PipelineMetrics> pipelineMetricsMap = new
ConcurrentHashMap<>();
Map<String, QueueMetrics> queueMetricsMap = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (PhysicalVertex vertex : verticesCopy) {
// ... collection logic
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> new CollectResult(pipelineMetricsMap,
queueMetricsMap));
}
```
**Rationale**:
- Use defensive copies to avoid concurrent modification
- Use `ConcurrentHashMap` to ensure thread safety
- Improve the usage of `CompletableFuture`
---
### Issue 2: Precision Issues in Metrics Calculation
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:227-237`
**Related Context**:
- TypeScript Version:
`seatunnel-engine/seatunnel-engine-ui/src/utils/format.ts:19-28`
- Usage: `BaseService.java:185-195`
**Problem Description**:
The `formatPercentage` method uses simple `double` division, which may
result in precision loss. For example, when `used = 1` and `total = 3`, the
result is `33.33333333333333`, which may be too long when displayed on the
frontend.
**Potential Risks**:
- Risk 1: Percentages displayed on the frontend may be too long, affecting
user experience
- Risk 2: Accumulated errors from multiple calculations
**Scope of Impact**:
- Direct Impact: All percentage displays (queue utilization, task busyness)
- Indirect Impact: UI display effects
- Impact Area: Web UI, REST API
**Severity**: MINOR
**Improvement Suggestions**:
```java
public static double formatPercentage(long used, long total) {
if (total == 0) {
return 0;
}
if (used < 0 || total < 0) {
logger.warn("Invalid values: used={}, total={}", used, total);
return 0;
}
// Keep 2 decimal places
return Math.round((double) used / total * 100 * 100) / 100.0;
}
```
**Rationale**:
- Add input validation
- Use `Math.round` to keep 2 decimal places
- Maintain consistent precision handling between frontend and backend
---
### Issue 3: Incomplete Exception Handling
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/observability/RealtimeMetricsService.java:152-167`
**Related Context**:
- Exception Source:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java:37-61`
**Problem Description**:
During metrics collection, if a certain node fails, only a warning log is
recorded while continuing to process other nodes. This prevents the caller from
knowing which nodes' metrics collection failed, and from distinguishing failure
causes (timeout, network error, node failure, etc.).
**Potential Risks**:
- Risk 1: Returning incomplete data, users may mistakenly believe the system
is functioning normally
- Risk 2: Inability to detect node failures in a timely manner
- Risk 3: Difficult to troubleshoot issues
**Scope of Impact**:
- Direct Impact: `RealtimeMetricsService.collectRealtimeMetrics()`
- Indirect Impact: All features that depend on metrics
- Impact Area: Core Framework
**Severity**: MAJOR
**Improvement Suggestions**:
```java
public CompletableFuture<CollectResult>
collectRealtimeMetrics(Collection<PhysicalVertex> vertices) {
Map<Long, PipelineMetrics> pipelineMetricsMap = new HashMap<>();
Map<String, QueueMetrics> queueMetricsMap = new HashMap<>();
List<String> failedVertices = new ArrayList<>();
for (PhysicalVertex vertex : vertices) {
try {
CompletableFuture<Void> future = (CompletableFuture<Void>)
vertex.executeOperation(new GetMetricsOperation());
future.get(5, TimeUnit.SECONDS); // Add timeout
// ... collection logic
} catch (TimeoutException e) {
logger.error("Timeout collecting metrics from vertex: {}",
vertex.getTaskName());
failedVertices.add(vertex.getTaskName() + "(timeout)");
} catch (ExecutionException e) {
logger.error("Failed to collect metrics from vertex: {}, error:
{}",
vertex.getTaskName(), e.getCause().getMessage());
failedVertices.add(vertex.getTaskName() + "(error)");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while collecting metrics");
throw new RuntimeException("Interrupted", e);
}
}
CollectResult result = new CollectResult(pipelineMetricsMap,
queueMetricsMap);
result.setFailedVertices(failedVertices);
return CompletableFuture.completedFuture(result);
}
```
**Rationale**:
- Distinguish between different exception types
- Record failed node information
- Add timeout mechanism
- Properly handle `InterruptedException`
---
### Issue 4: Insufficient Performance Optimization
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/observability/RealtimeMetricsService.java:152-167`
**Related Context**:
- Caller:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RealtimeMetricsServlet.java:55-61`
- Possible Call Frequency: Every 5-10 seconds (frontend polling)
**Problem Description**:
The current implementation synchronously collects all metrics on every HTTP
request, including:
1. Traversing all `PhysicalVertex`
2. Sending RPC requests to each node
3. Waiting for all responses
This leads to:
1. Long response times (potentially several seconds)
2. Significant performance overhead during high-frequency calls
3. Increased GC pressure
**Potential Risks**:
- Risk 1: HTTP request timeouts
- Risk 2: Impact on normal task processing performance
- Risk 3: User interface lag
**Scope of Impact**:
- Direct Impact: `RealtimeMetricsService`
- Indirect Impact: Overall system performance
- Impact Area: Core Framework
**Severity**: MAJOR
**Improvement Suggestions**:
```java
public class RealtimeMetricsService {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private volatile CollectResult cachedMetrics;
private final long cacheIntervalMs = 5000; // 5 second cache
@PostConstruct
public void init() {
// Background periodic cache update
scheduler.scheduleAtFixedRate(
() -> updateMetricsCache(),
0, cacheIntervalMs, TimeUnit.MILLISECONDS
);
}
private void updateMetricsCache() {
// Asynchronously collect metrics and update cache
collectRealtimeMetricsInternal().thenAccept(result -> {
cachedMetrics = result;
});
}
public CompletableFuture<CollectResult>
collectRealtimeMetrics(Collection<PhysicalVertex> vertices) {
// Return cache directly, collect synchronously if cache is empty
if (cachedMetrics != null) {
return CompletableFuture.completedFuture(cachedMetrics);
}
return collectRealtimeMetricsInternal(vertices);
}
}
```
**Rationale**:
- Use background threads to periodically update cache
- Reduce response time
- Minimize impact on main business logic
- Maintain data freshness (5 second delay)
---
### Issue 5: Insufficient Unit Test Coverage
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/observability/RealtimeMetricsServiceTest.java`
**Related Context**:
- Class Under Test: `RealtimeMetricsService.java`
**Problem Description**:
Current unit tests only cover normal scenarios, lacking the following tests:
1. Boundary condition tests (`total = 0`, `used > total`)
2. Concurrent access tests (multiple threads calling simultaneously)
3. Exception scenario tests (node failure, timeout)
4. Performance tests (metrics collection for large numbers of tasks)
**Potential Risks**:
- Risk 1: Bugs under boundary conditions not discovered
- Risk 2: Concurrency issues not exposed during testing phase
- Risk 3: Unexpected errors in production environment
**Scope of Impact**:
- Direct Impact: Code quality
- Indirect Impact: System stability
- Impact Area: Core Framework
**Severity**: MAJOR
**Improvement Suggestions**:
```java
public class RealtimeMetricsServiceTest {
@Test
public void testFormatPercentageWithZeroTotal() {
double result = BaseService.formatPercentage(10, 0);
assertEquals(0.0, result, 0.001);
}
@Test
public void testFormatPercentageWithNegativeValues() {
double result = BaseService.formatPercentage(-10, 100);
assertEquals(0.0, result, 0.001);
}
@Test
public void testConcurrentMetricsCollection() throws Exception {
// Simulate multi-threaded concurrent calls
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<CollectResult>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(executor.submit(() ->
service.collectRealtimeMetrics(vertices)));
}
// Wait for all tasks to complete
for (Future<CollectResult> future : futures) {
CollectResult result = future.get();
assertNotNull(result);
}
executor.shutdown();
}
@Test
public void testMetricsCollectionWithTimeout() {
// Simulate timeout scenario
when(vertex.executeOperation(any())).thenReturn(
CompletableFuture.failedFuture(new TimeoutException())
);
CollectResult result =
service.collectRealtimeMetrics(vertices).join();
// Verify timeout is handled correctly
}
}
```
**Rationale**:
- Add boundary condition tests
- Add concurrency tests
- Add exception scenario tests
- Improve code quality
---
### Issue 6: Incomplete Configuration Default Value Handling
**Location**:
`seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java`
**Related Context**:
- Configuration Class:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/observability/ObservabilityConfig.java:88-97`
**Problem Description**:
`ObservabilityConfig` defines configuration items, but no default values are
explicitly set in `MultipleTableJobConfigParser`. If users do not configure
them, `null` may be used instead of default configuration, leading to null
pointer exceptions.
**Potential Risks**:
- Risk 1: NPE occurs when users do not configure
- Risk 2: Inconsistent configuration behavior
**Scope of Impact**:
- Direct Impact: Job configuration parsing
- Indirect Impact: Feature availability
- Impact Area: Core Framework
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Add in MultipleTableJobConfigParser
private ObservabilityConfig parseObservabilityConfig(Config config) {
ObservabilityConfig observabilityConfig = new ObservabilityConfig();
if (config.hasPath("observability.enabled")) {
observabilityConfig.setEnabled(config.getBoolean("observability.enabled"));
} else {
observabilityConfig.setEnabled(false); // Explicitly set default
value
}
// ... other configuration items
return observabilityConfig;
}
```
**Rationale**:
- Explicitly set default values
- Avoid NPE
- Improve code robustness
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]