DanielCarter-stack commented on PR #10378:
URL: https://github.com/apache/seatunnel/pull/10378#issuecomment-3794720378
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10378", "part": 1,
"total": 1} -->
### Issue 1: FileInfo class missing equals() and hashCode()
**Location**: `FileInfo.java:20-37`
**Related Context**:
- Usage location: `AbstractReadStrategy.java:82-136`
- Storage container: `ArrayList<FileInfo>` (current)
**Problem Description**:
The `FileInfo` class serves as a value object but does not override the
`equals()` and `hashCode()` methods, violating Effective Java recommendations.
Although the current code only uses `ArrayList` and does not directly depend on
these two methods, if future operations require `HashSet`, `contains()`,
`remove()`, etc., semantic errors will occur.
**Potential Risks**:
- When using `Set<FileInfo>` or calling `fileInfoList.contains(new
FileInfo(...))` in future code evolution, unexpected behavior will occur
- Two `FileInfo` objects containing the same filename and modification time
should be considered equal, but under current implementation they are not equal
**Impact Scope**:
- Direct impact: Users of the `FileInfo` class
- Indirect impact: Any scenarios requiring comparison of `FileInfo` objects
- Affected area: connector-file-base module
**Severity**: MINOR
**Improvement Suggestions**:
```java
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileInfo fileInfo = (FileInfo) o;
return modifyTime == fileInfo.modifyTime &&
Objects.equals(fileName, fileInfo.fileName);
}
@Override
public int hashCode() {
return Objects.hash(fileName, modifyTime);
}
```
**Rationale**: Follow Java best practices to ensure semantic correctness of
`FileInfo` as a value object and avoid future bugs.
---
### Issue 2: Performance impact of sorting operation not evaluated
**Location**: `AbstractReadStrategy.java:129-136`
**Related Context**:
- Caller: All implementation classes of `ReadStrategy`
- Impact: All file formats
**Problem Description**:
The newly added sorting operation has O(n log n) time complexity and may
introduce significant performance overhead for directories containing large
numbers of files. The code does not evaluate this performance impact, nor does
it provide a configuration option to allow users to disable sorting. For
certain scenarios where schema inference is not needed (e.g., when users
explicitly provide a schema), sorting is unnecessary overhead.
**Potential Risks**:
- For directories containing millions of files, sorting may add significant
startup latency
- In some scenarios (e.g., when users explicitly provide a schema), sorting
may not be necessary but cannot be skipped
- Interacting with HDFS NameNode to retrieve file lists is already an
expensive operation, and adding sorting further degrades performance
**Impact Scope**:
- Direct impact: All `ReadStrategy` that use `getFileNamesByPath()`
- Indirect impact: All file formats including Parquet, Orc, Csv, Text, Json,
Xml, Excel, Binary, etc.
- Affected area: All connector-file users
**Severity**: MAJOR
**Improvement Suggestions**:
1. Add configuration option in `FileBaseSourceOptions`:
```java
public static final ConfigOption<Boolean> SORT_FILES_BY_MOD_TIME =
ConfigOptions.key("sort_files_by_modification_time")
.booleanType()
.defaultValue(false)
.withDescription("Sort files by modification time in
descending order. " +
"Enable this when reading evolving schemas to ensure
schema inference uses the latest file.");
```
2. Add conditional check in `AbstractReadStrategy`:
```java
protected boolean sortFilesByModTime = false;
// Read configuration in init() or setPluginConfig()
if (readonlyConfig != null) {
sortFilesByModTime = readonlyConfig.get(SORT_FILES_BY_MOD_TIME);
}
// Conditional sorting in getFileInfoListByPath()
if (sortFilesByModTime) {
long startTime = System.currentTimeMillis();
fileInfoList.sort(Comparator.comparingLong(FileInfo::getModifyTime).reversed());
long duration = System.currentTimeMillis() - startTime;
log.debug("Sorted {} files by modification time in {} ms",
fileInfoList.size(), duration);
}
```
3. Add performance tests
**Rationale**: Allow users to choose whether to sort based on their scenario
through configuration options, avoiding unnecessary performance overhead.
Defaulting to disabled maintains backward compatibility.
---
### Issue 3: Inconsistent refactoring of setCatalogTable method
**Location**: `AbstractReadStrategy.java:148-150`
**Related Context**:
- Old code (dev branch): Uses `getPathForPartitionInference(null)`
- Parent class: `ReadStrategy` interface
- Subclasses: Multiple `ReadStrategy` implementation classes
**Problem Description**:
The new and old code are inconsistent in the implementation of the
`setCatalogTable()` method. The old code uses a helper method
`getPathForPartitionInference(null)` to obtain paths for partition inference,
while the new code directly uses `fileNames.get(0)`. This inconsistency may
break certain edge case handling, especially if the
`getPathForPartitionInference` method has special logic.
**Potential Risks**:
- If `getPathForPartitionInference` is overridden in certain subclasses or
has special logic, this logic is bypassed
- Different `ReadStrategy` implementations may have different path selection
requirements, and uniformly using `fileNames.get(0)` may not be suitable for
all scenarios
- Reduced code readability and maintainability (why not use the helper
method?)
**Impact Scope**:
- Direct impact: `AbstractReadStrategy` and all its subclasses
- Indirect impact: All scenarios using `setCatalogTable()`
- Affected area: All file connectors
**Severity**: MAJOR
**Improvement Suggestions**:
Retain the helper method but modify its implementation:
```java
// In AbstractReadStrategy
protected String getPathForPartitionInference(String defaultPath) {
if (fileNames.isEmpty()) {
return defaultPath;
}
return fileNames.get(0); // Return the first (latest) file
}
@Override
public void setCatalogTable(CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(getPathForPartitionInference(null),
this.seaTunnelRowType);
}
```
**Rationale**: Maintain code consistency and extensibility, allowing
subclasses to override the `getPathForPartitionInference()` method to implement
custom logic.
---
### Issue 4: Empty fileNames list risk not handled
**Location**: `AbstractReadStrategy.java:148-150`
**Related Context**:
- Caller: Upper-level components such as `FileSource`
- Dependency: `getFileNamesByPath()` must be called first
**Problem Description**:
The `setCatalogTable()` method directly calls `fileNames.get(0)` without
checking if `fileNames` is empty. If `getFileNamesByPath()` has not yet been
called or returns an empty list, it will throw `IndexOutOfBoundsException`, and
the error message is unclear, making it difficult for users to understand the
root cause.
**Potential Risks**:
- Incorrect call order (calling `setCatalogTable()` before
`getFileNamesByPath()`) will cause a crash
- When the directory is empty or has no matching files,
`ArrayIndexOutOfBoundsException: 0` will occur
- Error message is unclear, and users don't know if it's due to no files or
other reasons
**Impact Scope**:
- Direct impact: All users of `ReadStrategy`
- Indirect impact: All file connector users
- Affected area: Production environment stability
**Severity**: MAJOR
**Improvement Suggestions**:
```java
@Override
public void setCatalogTable(CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
if (fileNames.isEmpty()) {
log.warn("No files available for partition type inference. " +
"Ensure getFileNamesByPath() is called before
setCatalogTable(). " +
"Using provided schema without partition information.");
this.seaTunnelRowTypeWithPartition = this.seaTunnelRowType;
} else {
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0),
catalogTable.getSeaTunnelRowType());
}
}
```
**Rationale**: Check in advance and provide clear error messages to avoid
crashes and improve system robustness.
---
### Issue 5: Missing compatibility documentation and migration guide
**Location**: PR description and project documentation
**Related Context**:
- File: `incompatible-changes.md`
- File: `docs/en/connector-v2/source/FileSource.md`
- Impact: All upgrading users
**Problem Description**:
The PR description states "Does this PR introduce any user-facing change?
NO", but it actually changes behavior: `getFileNamesByPath()` now returns a
file list sorted by modification time. This is an important behavior change,
but `incompatible-changes.md` has not been updated, and no migration guide is
provided.
**Potential Risks**:
- If users depend on implicit file processing order (although such
dependency itself is problematic), it will break after upgrade
- Users are unaware of this change and unexpected behavior may occur during
upgrade
- Lack of documentation prevents users from understanding new behavior and
how to configure
**Impact Scope**:
- Direct impact: All connector-file upgrading users
- Indirect impact: Data pipelines depending on file processing order
- Affected area: All production environment upgrades
**Severity**: MAJOR
**Improvement Suggestions**:
1. Update `incompatible-changes.md`:
```markdown
## File Connector
### File List Ordering Change (2.3.5)
**Change**: `getFileNamesByPath()` now returns files sorted by modification
time in descending order (newest first).
**Impact**: Users who relied on implicit file processing order may
experience different behavior.
**Migration**: If you need the old unordered behavior, you can:
- Use a specific file pattern to match only the files you want
- Explicitly specify the schema instead of relying on schema inference
- Check the upgrade notes for your specific use case
**Related Issue**: #10377
```
2. Update user documentation to explain the new sorting behavior and
configuration options (if added)
**Rationale**: Follow Apache project change management specifications to
ensure users understand upgrade impacts and reduce production environment
issues.
---
### Issue 6: No performance benchmarking conducted
**Location**: Entire PR
**Related Context**:
- Impact: All users using connector-file
- Key operations: File sorting, field lookup
**Problem Description**:
The PR does not provide performance test data or benchmark results. The
modifications introduce two operations that may affect performance:
1. Sorting file lists (O(n log n))
2. Parquet field access changed from index-based to field name lookup
The performance impact of these modifications cannot be quantified, and for
big data scenarios (millions of files), the impact may be significant.
**Potential Risks**:
- Performance regression may occur in production environments
- Unable to assess whether performance impact is acceptable
- Unable to identify performance bottlenecks
**Impact Scope**:
- Direct impact: All connector-file users
- Special impact: Users processing large numbers of files
- Affected area: Production performance
**Severity**: MAJOR
**Improvement Suggestions**:
Add performance benchmark tests:
```java
@Test
public void benchmarkFileSorting() {
// Test sorting performance with different numbers of files
int[] fileCounts = {100, 1000, 10000, 100000};
for (int count : fileCounts) {
List<FileInfo> files = generateTestFiles(count);
long startTime = System.currentTimeMillis();
files.sort(Comparator.comparingLong(FileInfo::getModifyTime).reversed());
long duration = System.currentTimeMillis() - startTime;
log.info("Sorted {} files in {} ms", count, duration);
}
}
@Test
public void benchmarkParquetFieldAccess() {
// Compare performance difference between index access vs field name
access
// Use JMH or similar tools for microbenchmark testing
}
```
**Rationale**: Performance data is an important indicator for evaluating PR
quality, especially in scenarios processing large amounts of data.
---
### Issue 7: Silent return of null when record.hasField() returns false
**Location**: `ParquetReadStrategy.java:157-160`
**Related Context**:
- Call chain: `read()` → `resolveObject()` → downstream processing
- Impact: Schema mismatch scenarios
**Problem Description**:
When an expected field does not exist in a Parquet record, the code silently
returns null without logging or warning. Users may not be aware that certain
fields are skipped, which is inconsistent with old behavior (which may have
thrown exceptions). This can lead to data inconsistency going unnoticed.
**Potential Risks**:
- When schemas don't match, data is silently skipped, making it difficult
for users to detect
- May lead to data quality issues (null values propagated downstream)
- Difficult to debug (no logs explaining why it's null)
**Impact Scope**:
- Direct impact: Parquet schema mismatch scenarios
- Indirect impact: Downstream data processing
- Affected area: Data quality
**Severity**: MAJOR
**Improvement Suggestions**:
```java
String fieldName = seaTunnelRowType.getFieldName(i);
Object data;
if (record.hasField(fieldName)) {
data = record.get(fieldName);
} else {
if (log.isDebugEnabled()) {
log.debug("Field '{}' not found in parquet record at path {}. " +
"Record schema: {}. Expected field will be set to null.",
fieldName, path, record.getSchema());
}
data = null;
}
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
```
Or for production environments, use WARN level:
```java
if (!record.hasField(fieldName)) {
log.warn("Field '{}' not found in parquet record. This may indicate
schema mismatch. " +
"Setting field value to null. File: {}", fieldName, path);
data = null;
} else {
data = record.get(fieldName);
}
```
**Rationale**: Provide sufficient observability to help users identify and
debug schema mismatch issues.
---
### Issue 8: FileInfo class missing JavaDoc
**Location**: `FileInfo.java:20-37`
**Related Context**:
- Package: `org.apache.seatunnel.connectors.seatunnel.file.config`
- Project standard: Apache code standards
**Problem Description**:
The `FileInfo` class is a new public class but lacks JavaDoc comments. This
does not comply with Apache project code standards. Other classes such as
`AbstractReadStrategy` have detailed JavaDoc, but `FileInfo` does not.
**Potential Risks**:
- Other developers are unclear about the class's purpose and usage scenarios
- Difficult to maintain (lacking documentation of design intent)
- Does not meet Apache project standards
**Impact Scope**:
- Direct impact: Code maintainability
- Indirect impact: Project quality standards
- Affected area: Development experience
**Severity**: MINOR
**Improvement Suggestions**:
```java
/**
* Represents a file with its metadata information for schema inference.
*
* <p>This class is used to track file information during file listing
operations,
* particularly to support sorting by modification time. When reading files
with
* evolving schemas (e.g., Parquet files where new fields are added over
time),
* the latest file should be used for schema inference.
*
* <p>The modification time is obtained from {@link
FileStatus#getModificationTime()}
* and represents the time when the file was last modified, as reported by
the file system.
*
* @since 2.3.5
* @see FileStatus
* @see AbstractReadStrategy#getFileInfoListByPath(String)
*/
public class FileInfo {
/**
* The full path of the file.
*/
private final String fileName;
/**
* The last modification time of the file in milliseconds since epoch.
* This value is obtained from the file system and may not be available
* for all file system types.
*/
private final long modifyTime;
/**
* Creates a FileInfo instance.
*
* @param fileName the full path of the file, typically from {@link
FileStatus#getPath()}
* @param modifyTime the last modification time in milliseconds since
epoch,
* from {@link FileStatus#getModificationTime()}
* @throws IllegalArgumentException if fileName is null
*/
public FileInfo(String fileName, long modifyTime) {
this.fileName = Objects.requireNonNull(fileName, "fileName cannot be
null");
this.modifyTime = modifyTime;
}
/**
* Returns the file name (full path).
*
* @return the file name, never null
*/
public String getFileName() {
return fileName;
}
/**
* Returns the last modification time.
*
* @return the modification time in milliseconds since epoch
*/
public long getModifyTime() {
return modifyTime;
}
}
```
**Rationale**: Follow Apache project code standards to improve code
maintainability and readability.
---
### Issue 9: Incomplete test coverage
**Location**: `ParquetReadStrategyTest.java`
**Related Context**:
- New tests: `testParquetSchemaMerge()`
- Covered scenarios: Old files + new files
**Problem Description**:
Test cases only cover schema evolution (field addition) scenarios, but do
not cover the following important scenarios:
1. Schema downgrade (new files have fewer fields than old files)
2. Field type changes (same field name with changed type)
3. Multi-file mixing (multiple different schema versions coexisting)
4. Empty directory (no files in directory)
5. Single file (scenario with only one file)
6. Boundary conditions (files have the same modification time)
**Potential Risks**:
- Edge cases may not be caught
- Unexpected behavior may occur in production environments
- Unable to guarantee code correctness in various scenarios
**Impact Scope**:
- Direct impact: Parquet connector reliability
- Indirect impact: User data quality
- Affected area: Production stability
**Severity**: MAJOR
**Improvement Suggestions**:
```java
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetSchemaDowngrade() throws Exception {
// Test that new file has fewer fields than old file
AutoGenerateParquetDataWithSchemaDowngrade.generateTestData();
// ... verify behavior
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetSchemaFieldTypeChange() throws Exception {
// Test field type changes (e.g., int -> string)
AutoGenerateParquetDataWithFieldTypeChange.generateTestData();
// ... verify handling approach
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetSchemaMergeMultipleFiles() throws Exception {
// Test 3 files with different schemas: v1, v2, v3
AutoGenerateParquetDataWithMultipleSchemas.generateTestData();
// ... verify using the latest schema
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetEmptyDirectory() throws Exception {
// Test empty directory
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalFileSystemConf.LocalConf localConf =
new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
List<String> files =
parquetReadStrategy.getFileNamesByPath("/tmp/empty_dir");
Assertions.assertTrue(files.isEmpty());
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetSingleFile() throws Exception {
// Test single file scenario
AutoGenerateParquetData.generateSingleFile();
// ... verify it works normally
}
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetSameModificationTime() throws Exception {
// Test case where multiple files have the same modification time
AutoGenerateParquetDataWithSameModTime.generateTestData();
// ... verify behavior (stability)
}
```
**Rationale**: Comprehensive test coverage is key to ensuring code quality,
especially for complex scenarios like schema evolution.
---
### Issue 10: Missing end-to-end integration tests
**Location**: Test suite
**Related Context**:
- Existing: Unit tests
- Missing: E2E tests
**Problem Description**:
The PR only has unit tests and no end-to-end integration tests. The complete
read pipeline is not verified, and there are no integration tests with other
components (such as Sink). Unit tests passing does not mean there are no issues
after integration.
**Potential Risks**:
- Unit tests pass but there may be issues after integration
- Behavior in real-world scenarios is not verified
- May miss issues such as interface mismatches, configuration errors, etc.
**Impact Scope**:
- Direct impact: PR quality assurance
- Indirect impact: Production environment stability
- Affected area: Overall system quality
**Severity**: MAJOR
**Improvement Suggestions**:
Add integration tests in
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-e2e/`:
```java
@SpringBootTest
public class FileParquetSchemaEvolutionE2EIT {
@Test
public void testParquetToConsoleSchemaEvolution() throws Exception {
// 1. Prepare test data: create parquet files with different schemas
prepareParquetFiles();
// 2. Configure SeaTunnel job: read from Parquet, write to Console
String config = loadConfig("parquet_schema_evolution.conf");
// 3. Execute job
SeaTunnelClient client = new SeaTunnelClient();
JobExecution jobExecution = client.execute(config);
// 4. Verify results
Assertions.assertEquals(JobExecution.Status.SUCCESS,
jobExecution.getStatus());
// 5. Cleanup
cleanup();
}
}
```
**Rationale**: E2E testing is key to validating complex scenarios, ensuring
the entire data flow works as expected.
---
### Issue 11: Missing logs and metrics for key operations
**Location**: `AbstractReadStrategy.java`, `ParquetReadStrategy.java`
**Related Context**:
- Sorting operation: `AbstractReadStrategy.java:129`
- Field missing: `ParquetReadStrategy.java:157`
**Problem Description**:
Lacks logging for key operations:
1. When file lists are sorted, there is no log record
2. Field missing silently returns null
3. No performance metrics such as file count, sorting time are recorded
**Potential Risks**:
- Users don't know sorting occurred
- Difficult to diagnose performance issues
- Difficult to debug schema mismatch issues
**Impact Scope**:
- Direct impact: Observability
- Indirect impact: Problem diagnosis
- Affected area: Operations and debugging
**Severity**: MINOR
**Improvement Suggestions**:
```java
// In AbstractReadStrategy.getFileInfoListByPath()
if (sortFilesByModTime) {
int size = fileInfoList.size();
if (size > 0) {
long start = System.currentTimeMillis();
fileInfoList.sort(Comparator.comparingLong(FileInfo::getModifyTime).reversed());
long duration = System.currentTimeMillis() - start;
log.info("Sorted {} files by modification time (descending) in {}
ms. " +
"Latest file: {}, Oldest file: {}",
size, duration,
fileInfoList.get(0).getFileName(),
fileInfoList.get(size - 1).getFileName());
}
}
// In ParquetReadStrategy.read()
String fieldName = seaTunnelRowType.getFieldName(i);
Object data;
if (record.hasField(fieldName)) {
data = record.get(fieldName);
} else {
if (missingFieldCount == 0) {
log.warn("Schema mismatch detected: Field '{}' not found in parquet
record. " +
"Expected schema: {}, Actual schema: {}. Missing fields
will be set to null.",
fieldName, seaTunnelRowType, record.getSchema());
}
missingFieldCount++;
data = null;
}
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
```
**Rationale**: Sufficient logs and metrics are the foundation of production
environment observability.
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]