DanielCarter-stack commented on PR #10433:
URL: https://github.com/apache/seatunnel/pull/10433#issuecomment-3832902373
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10433", "part": 1,
"total": 1} -->
### Issue 1: path.getParent() may return null causing NPE
**Location**: `HadoopFileSystemProxy.java:210`
**Modified code**:
```java
IOException enhanced =
enhanceMkdirsException(
fs, path.getParent(), "create file " + path.getName(), e);
```
**Related context**:
- Callers: all WriteStrategy (18+ locations)
- Call chain: `WriteStrategy` → `getOutputStream()` → `FileSystem.create()`
→ `enhanceMkdirsException()`
**Problem description**:
When `filePath` is a relative path (e.g., `"test.txt"`) or root path (e.g.,
`"/"`), `path.getParent()` returns `null`. In the `enhanceMkdirsException()`
method (L297), the code executes:
```java
Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) { // If parent is null, this will
be skipped here
reason.append("Parent directory does not exist:
").append(parent).append(". ");
} else {
reason.append("Directory does not exist and creation failed: ")
.append(path)
.append(". ");
}
```
Although L297 has a null check, the subsequent L306-L316 will still call
`fs.getFileStatus(path)`, which may fail in certain edge cases.
**Potential risks**:
1. **Risk 1**: If `path` itself is `null` (although filePath has `@NonNull`,
`new Path(null)` may pass through), it will cause NPE
2. **Risk 2**: Error message is inaccurate: for relative paths, the parent
directory is the current working directory, but the error message will display
"Parent directory does not exist: null"
3. **Risk 3**: At `enhanceMkdirsException()` L298, if `parent` is null, the
error message will be confusing
**Impact scope**:
- **Direct impact**: `getOutputStream()` method
- **Indirect impact**: all WriteStrategy using relative paths to create files
- **Affected surface**: all File Connectors (~18 WriteStrategy
implementations)
**Severity**: **MAJOR** (medium-high)
**Improvement suggestion**:
```java
public FSDataOutputStream getOutputStream(String filePath) throws
IOException {
return execute(
() -> {
Path path = new Path(filePath);
FileSystem fs = getFileSystem();
try {
return fs.create(path, true);
} catch (IOException e) {
Path parent = path.getParent();
// Fix: Handle null parent
String pathContext = (parent != null) ?
parent.toString() : "current directory";
IOException enhanced =
enhanceMkdirsException(
fs, parent, "create file " +
path.getName(), e);
throw CommonError.fileOperationFailed(
"SeaTunnel", "create", filePath, enhanced);
}
});
}
// Also modify enhanceMkdirsException:
private IOException enhanceMkdirsException(
FileSystem fs, Path path, String operation, IOException cause)
throws IOException {
StringBuilder reason = new StringBuilder();
// Fix: Handle null path
if (path == null) {
reason.append("Path is null. ");
} else if (!fs.exists(path)) {
Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) {
reason.append("Parent directory does not exist:
").append(parent).append(". ");
} else if (parent == null) {
reason.append("Path is in current directory. ");
} else {
reason.append("Directory does not exist and creation failed: ")
.append(path)
.append(". ");
}
try {
fs.getFileStatus(path);
} catch (IOException e) {
if (e.getMessage() != null) {
if (e.getMessage().contains("Permission denied")) {
reason.append("Permission denied. ");
} else {
reason.append("Hadoop error:
").append(e.getMessage()).append(". ");
}
}
}
} else {
reason.append("Path exists but may be inaccessible:
").append(path).append(". ");
}
reason.append("Operation: ")
.append(operation)
.append(". ")
.append("Current working directory: ")
.append(fs.getWorkingDirectory());
IOException enhanced = new IOException(reason.toString());
if (cause != null) {
enhanced.addSuppressed(cause);
}
return enhanced;
}
```
**Rationale**:
1. Defensive programming: handle the case where `path.getParent()` returns
`null`
2. More accurate error messages: for relative paths, indicate "current
directory" instead of "null"
3. Improved robustness: avoid potential NPE
---
### Issue 2: enhanceMkdirsException() method lacks JavaDoc
**Location**: `HadoopFileSystemProxy.java:292-332`
**Modified code**:
```java
private IOException enhanceMkdirsException(
FileSystem fs, Path path, String operation, IOException cause)
throws IOException {
// 45 lines of code, no JavaDoc
}
```
**Related context**:
- Other private methods in the same class: most have comments
- Callers: `createDir()` (L149), `getOutputStream()` (L209)
**Problem description**:
`enhanceMkdirsException()` is a complex private method (~45 lines),
involving multiple FileSystem API calls and error message parsing, but lacks
JavaDoc explaining its purpose, parameter meanings, and return value
conventions.
**Potential risks**:
1. **Risk 1**: Future maintainers may not understand the method's purpose
and incorrectly modify or delete it
2. **Risk 2**: The meaning of parameter `operation` is unclear (is it
"current operation" or "target operation")
3. **Risk 3**: The structure and format of the returned `IOException` is not
documented, making it difficult for callers to depend on
**Impact scope**:
- **Direct impact**: code maintainability
- **Indirect impact**: future extension and debugging
- **Affected surface**: single class, but affects long-term maintenance
**Severity**: **MINOR** (low)
**Improvement suggestion**:
```java
/**
* Enhances IOException with detailed diagnostic information for
directory/file creation failures.
* <p>
* This method performs the following diagnostic checks:
* <ul>
* <li>Checks if the parent directory exists</li>
* <li>Detects permission denied errors from Hadoop</li>
* <li>Captures Hadoop-specific error messages</li>
* <li>Includes current working directory for context</li>
* </ul>
*
* @param fs the FileSystem instance to perform diagnosticchecks on
* @param path the path that failed to create (can be null for relative
paths)
* @param operation the operation being performed (e.g., "create directory",
"create file")
* @param cause the original IOException from FileSystem.mkdirs() or
FileSystem.create() (can be null)
* @return an enhanced IOException with detailed diagnostic information in
the message
* @throws IOException if diagnostic checks (e.g., fs.exists(),
fs.getFileStatus()) fail
*
* @see CommonError#fileOperationFailed(String, String, String, Throwable)
*/
private IOException enhanceMkdirsException(
FileSystem fs, Path path, String operation, IOException cause)
throws IOException {
// ...
}
```
**Rationale**:
1. Apache top-level projects should have comprehensive documentation
2. Complex methods need clear contract descriptions
3. Facilitate understanding and extension by future maintainers
---
### Issue 3: Missing unit tests for this modification
**Location**:
`seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/`
**Modified code**:
No new test files
**Related context**:
- Existing tests: `HadoopFileSystemProxyKerberosRenewTest.java`
- Modified methods: `createDir()`, `getOutputStream()`, `renameFile()`
- New methods: `enhanceMkdirsException()`
**Problem description**:
The PR modifies core error handling logic but does not add corresponding
unit tests. According to Apache project standards, all new features and bug
fixes should have test coverage.
**Potential risks**:
1. **Risk 1**: Regression risk: future modifications may break this
improvement
2. **Risk 2**: Boundary conditions not tested: e.g., when `path.getParent()`
returns `null`
3. **Risk 3**: Cannot verify whether the fix is effective: e.g., whether the
"directory already exists" scenario really no longer throws exceptions
**Impact scope**:
- **Direct impact**: code quality assurance
- **Indirect impact**: confidence in future maintenance
- **Affected surface**: entire File Connector module
**Severity**: **MAJOR** (medium-high)
**Improvement suggestion**:
Create new test file `HadoopFileSystemProxyTest.java`:
```java
package org.apache.seatunnel.connectors.seatunnel.file.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import static org.junit.jupiter.api.Assertions.*;
class HadoopFileSystemProxyTest {
private HadoopFileSystemProxy proxy;
private HadoopConf hadoopConf;
@TempDir
java.nio.file.Path tempDir;
@BeforeEach
void setUp() throws IOException {
hadoopConf = new HadoopConf();
hadoopConf.setHdfsPath("file://" + tempDir.toString());
proxy = new HadoopFileSystemProxy(hadoopConf);
}
@AfterEach
void tearDown() throws IOException {
if (proxy != null) {
proxy.close();
}
}
@Test
void testCreateDirWhenAlreadyExists() throws IOException {
// Given: Directory already exists
String dirPath = tempDir.resolve("test-dir").toString();
proxy.createDir(dirPath);
// When: Create again
// Then: Should not throw exception
assertDoesNotThrow(() -> proxy.createDir(dirPath));
}
@Test
void testCreateDirWhenParentNotExists() {
// Given: Parent directory does not exist
String dirPath =
tempDir.resolve("non-existent-parent/child").toString();
// When & Then: Should throw exception with detailed information
SeaTunnelRuntimeException ex = assertThrows(
SeaTunnelRuntimeException.class,
() -> proxy.createDir(dirPath)
);
// Verify error message contains useful diagnostic information
assertTrue(ex.getMessage().contains("Parent directory does not
exist") ||
ex.getMessage().contains("Hadoop error"));
}
@Test
void testGetOutputStreamWhenSuccess() throws IOException {
// Given: Parent directory exists
String filePath = tempDir.resolve("test-file.txt").toString();
// When: Create output stream
// Then: Should not throw exception
assertDoesNotThrow(() -> proxy.getOutputStream(filePath));
}
@Test
void testGetOutputStreamWhenParentNotExists() {
// Given: Parent directory does not exist
String filePath =
tempDir.resolve("non-existent-parent/file.txt").toString();
// When & Then: Should throw exception with parent directory
information
SeaTunnelRuntimeException ex = assertThrows(
SeaTunnelRuntimeException.class,
() -> proxy.getOutputStream(filePath)
);
// Verify error message contains useful diagnostic information
String message = ex.getMessage();
assertTrue(message.contains("Parent directory") ||
message.contains("Hadoop error") ||
message.contains("Permission denied"));
}
@Test
void testGetOutputStreamWithRelativePath() throws IOException {
// Given: Relative path
String relativePath = "test-relative.txt";
// When: Create output stream
// Then: Should not throw NPE
assertDoesNotThrow(() -> proxy.getOutputStream(relativePath));
}
@Test
void testGetOutputStreamWithRootPath() {
// Given: Root path (Edge Case)
String rootPath = "/";
// When & Then: Should throw exception (but not NPE)
Exception ex = assertThrows(Exception.class, () -> {
proxy.getOutputStream(rootPath);
});
// Verify it's not NPE
assertFalse(ex instanceof NullPointerException);
}
}
```
**Rationale**:
1. Apache project standards: all code should have tests
2. Cover key scenarios: directory already exists, parent directory does not
exist, relative paths, etc.
3. Prevent regression: ensure future modifications won't break this
improvement
4. Verify fix: ensure the behavior of "directory already exists no longer
throws exceptions" is truly effective
---
### Issue 4: fs.exists() and fs.getFileStatus() in enhanceMkdirsException()
may throw swallowed exceptions
**Location**: `HadoopFileSystemProxy.java:296-316`
**Modified code**:
```java
if (!fs.exists(path)) {
Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) {
reason.append("Parent directory does not exist:
").append(parent).append(". ");
} else {
reason.append("Directory does not exist and creation failed: ")
.append(path)
.append(". ");
}
try {
fs.getFileStatus(path); // A new IOException may be thrown here
} catch (IOException e) {
if (e.getMessage() != null) {
if (e.getMessage().contains("Permission denied")) {
reason.append("Permission denied. ");
} else {
reason.append("Hadoop error:
").append(e.getMessage()).append(". ");
}
}
}
}
```
**Related context**:
- Callers: `createDir()` (L149), `getOutputStream()` (L209)
- Hadoop FileSystem API: both `exists()` and `getFileStatus()` are remote
calls
**Problem description**:
In `enhanceMkdirsException()`, both `fs.exists()` and `fs.getFileStatus()`
are Hadoop RPC calls that may throw new `IOException` (network failure,
NameNode downtime, etc.). The current code's handling of these exceptions is
inconsistent:
1. `fs.exists()` calls (L296, L298): exceptions not caught, will be thrown
directly
2. `fs.getFileStatus()` call (L307): exceptions caught, but if
`e.getMessage()` is `null`, all error information will be lost
**Potential risks**:
1. **Risk 1**: If a network failure occurs during exception enhancement,
`fs.exists()` will throw a new exception, causing the original error message to
be lost
2. **Risk 2**: For `getFileStatus()`'s `IOException`, if `getMessage()`
returns `null`, error information will be lost
3. **Risk 3**: Nested RPC calls increase failure risk: original failure +
diagnostic call failure
**Impact scope**:
- **Direct impact**: reliability of error diagnostics
- **Indirect impact**: debugging difficulty (incomplete error messages)
- **Affected surface**: all scenarios using `createDir()` and
`getOutputStream()`
**Severity**: **MINOR** (low-medium)
**Improvement suggestion**:
```java
private IOException enhanceMkdirsException(
FileSystem fs, Path path, String operation, IOException cause)
throws IOException {
StringBuilder reason = new StringBuilder();
try {
if (!fs.exists(path)) {
Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) {
reason.append("Parent directory does not exist:
").append(parent).append(". ");
} else if (parent == null) {
reason.append("Path is in current directory. ");
} else {
reason.append("Directory does not exist and creation failed:
")
.append(path)
.append(". ");
}
// Safely attempt to get detailed error
try {
fs.getFileStatus(path);
} catch (IOException e) {
String errorMsg = e.getMessage();
if (errorMsg != null && !errorMsg.isEmpty()) {
if (errorMsg.contains("Permission denied")) {
reason.append("Permission denied. ");
} else {
reason.append("Hadoop error:
").append(errorMsg).append(". ");
}
} else {
reason.append("Hadoop error: (no detailed message). ");
}
}
} else {
reason.append("Path exists but may be inaccessible:
").append(path).append(". ");
}
} catch (IOException diagnosticEx) {
// If diagnostic checks fail, fall back to basic information
reason.append("Failed to diagnose path: ")
.append(path)
.append(". Diagnostic error: ")
.append(diagnosticEx.getMessage())
.append(". ");
}
reason.append("Operation: ")
.append(operation)
.append(". ")
.append("Current working directory: ");
// Safely get working directory
try {
reason.append(fs.getWorkingDirectory());
} catch (IOException e) {
reason.append("(unknown: ").append(e.getMessage()).append(")");
}
IOException enhanced = new IOException(reason.toString());
if (cause != null) {
enhanced.addSuppressed(cause);
}
return enhanced;
}
```
**Rationale**:
1. Defensive programming: provide basic error information when diagnostic
calls fail
2. Preserve original exception: retain original `cause` through
`addSuppressed`
3. More robust: avoid throwing new uncaught exceptions during error handling
---
### Issue 5: Log warning level in renameFile() may be inappropriate
**Location**: `HadoopFileSystemProxy.java:106-111`
**Modified code**:
```java
if (!fileExist(oldPath.toString())) {
log.warn(
"rename file:[{}] to [{}] already finished in the last commit,
skip. "
+ "WARNING: In cluster mode with LocalFile without
shared storage, "
+ "the file may not be actually synced successfully, but
the status shows success.",
oldPath,
newPath);
return Void.class;
}
```
**Related context**:
- Caller: `FileSinkAggregatedCommitter.commit()` (L59)
- Scenario: duplicate submission after checkpoint recovery
**Problem description**:
In `renameFile()`, if the source file does not exist (possibly because the
last commit has completed), the `log.warn()` level is used. However, this may
be **normal behavior** rather than an exceptional situation in the following
scenarios:
1. Checkpoint recovery: after a job failure and restart, re-executing
commit, the file was already successfully renamed in the previous attempt
2. Idempotency: `FileSinkAggregatedCommitter.commit()` should be idempotent,
and the source file not existing is normal
Using the `WARN` level may lead to:
1. Log noise: normal checkpoint recovery generates large amounts of warnings
2. False positives: operations personnel may mistakenly believe there are
serious problems
**Potential risks**:
1. **Risk 1**: Log level misuse, masking real warnings
2. **Risk 2**: In large-scale clusters, frequent checkpoint recovery will
generate大量 WARN logs
3. **Risk 3**: Log aggregation systems (e.g., ELK) may falsely report as
anomalies
**Impact scope**:
- **Direct impact**: log readability and monitoring alerts
- **Indirect impact**: operational efficiency
- **Affected surface**: all jobs using File Sink
**Severity**: **MINOR** (low)
**Improvement suggestion**:
**Option 1**: Change to INFO level (recommended)
```java
if (!fileExist(oldPath.toString())) {
log.info(
"rename file:[{}] to [{}] already finished in the last commit,
skip. "
+ "INFO: In cluster mode with LocalFile without shared
storage, "
+ "the file may not be actually synced successfully, but
the status shows success.",
oldPath,
newPath);
return Void.class;
}
```
**Option 2**: Distinguish log levels based on scenario
```java
if (!fileExist(oldPath.toString())) {
// Check if in checkpoint recovery scenario (can be determined from
context)
if (isCheckpointRecovery()) {
log.debug("File [{}] already renamed to [{}], skipping", oldPath,
newPath);
} else {
log.warn(
"rename file:[{}] to [{}] already finished in the last
commit, skip. "
+ "WARNING: In cluster mode with LocalFile without
shared storage, "
+ "the file may not be actually synced successfully,
but the status shows success.",
oldPath,
newPath);
}
return Void.class;
}
```
**Option 3**: Keep WARN, but clearly mark as "expected behavior"
```java
if (!fileExist(oldPath.toString())) {
log.warn(
"rename file:[{}] to [{}] already finished in the last commit,
skip. "
+ "Note: This is expected during checkpoint recovery. "
+ "However, in cluster mode with LocalFile without
shared storage, "
+ "the file may not be actually synced successfully, but
the status shows success.",
oldPath,
newPath);
return Void.class;
}
```
**Rationale**:
1. Option 1 is simplest: INFO level is more appropriate for "duplicate skips
in idempotent operations"
2. Option 2 is most precise: distinguish between normal recovery and real
exceptions
3. Option 3 has minimal changes: keep WARN, but indicate it's expected
behavior
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]