0AyanamiRei commented on PR #61025:
URL: https://github.com/apache/doris/pull/61025#issuecomment-3999260306
Code Review: improve-tvf-skip-empty_file
Goal
When using TVF (Table-Valued Function) with csv_with_names or similar
formats, if the first file in a glob pattern is empty (or contains only
newlines), schema inference fails. This PR makes schema inference skip empty
files and try the next candidate.
Summary of Changes
FE
([ExternalFileTableValuedFunction.java](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java)):
Refactored getTableColumns() to iterate through file candidates for schema
inference, skipping files that return END_OF_FILE. Extracted helper methods
buildFileScanRangeParams(), buildFileRangeDesc(), buildSchemaRequest(),
fetchSchema(), throwFetchError().
BE
([csv_reader.cpp](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/be/src/vec/exec/format/csv/csv_reader.cpp)):
Changed _parse_col_nums() and _parse_col_names() to return Status::EndOfFile
instead of Status::InternalError when the first line is empty. Also fixed BOM
handling in enclose mode.
BE
([new_plain_text_line_reader.cpp](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp)):
Fixed a decompression progress check condition and added a debug injection
point.
Tests
([test_tvf_empty_file.groovy](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/regression-test/suites/load_p0/tvf/test_tvf_empty_file.groovy)):
Added test case for csv_with_names format with a mix of empty and non-empty
files.
Issues Found
Issue 1 (HIGH): buildFileScanRangeParams() has side effect of adding to
fileStatuses — called multiple times in the retry loop
In the non-stream path, getTableColumns() calls
getFetchTableStructureRequest(fileStatus) for each candidate, which internally
calls buildFileScanRangeParams(). However, buildFileScanRangeParams() has this
code at line 485-488:
if (getTFileType() == TFileType.FILE_STREAM) {
fileStatuses.add(new TBrokerFileStatus("", false, -1, true));
fileScanRangeParams.setFileType(getTFileType());
}
While the stream path is guarded by the if (getTFileType() ==
TFileType.FILE_STREAM) branch in getTableColumns(), and file-based sources
won't hit this code, buildFileScanRangeParams() is called once per candidate
file in the retry loop. This is wasteful — params are identical for every file.
Consider building params once before the loop:
// Before the for loop:
TFileScanRangeParams params = buildFileScanRangeParams();
for (TBrokerFileStatus fileStatus : fileStatuses) {
...
PFetchTableSchemaRequest request = buildSchemaRequest(params,
buildFileRangeDesc(fileStatus));
...
}
The overloaded getFetchTableStructureRequest(TBrokerFileStatus) (line
551-554) also redundantly calls buildFileScanRangeParams() each time.
Issue 2 (MEDIUM): Double error check in fetchSchema() vs caller loop
fetchSchema() at line 338-340 already throws on non-OK/non-END_OF_FILE
statuses:
if (code != TStatusCode.OK && code != TStatusCode.END_OF_FILE) {
throwFetchError(result, address);
}
But in the getTableColumns() loop at line 313-314, the caller also checks:
if (code != TStatusCode.OK) {
throwFetchError(candidateResult, address);
}
This second check is redundant — fetchSchema() already guarantees the
returned result has either OK or END_OF_FILE status. Not a bug, but the
redundancy is confusing. Consider removing the error throwing from
fetchSchema() and keeping it only in the callers, or removing the duplicate
check from the loop body.
Issue 3 (MEDIUM): Stream path calls fetchSchema() which now throws on non-OK
errors, but the original request == null case is handled differently
In the stream path (line 291-296):
if (getTFileType() == TFileType.FILE_STREAM) {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
if (request != null) {
result = fetchSchema(address, request);
}
}
If fetchSchema() returns END_OF_FILE for a stream source, result will have
END_OF_FILE status and will be passed to fillColumns(result). fillColumns
doesn't check the status — it proceeds to use result.getColumnNums() etc. This
seems like it could work by accident (0 columns + path columns), but the
behavior is inconsistent. The stream path should arguably never get
END_OF_FILE, but this is worth documenting or adding a check.
Issue 4 (MEDIUM): _parse_col_nums returning EndOfFile may confuse callers
other than fetch_table_schema
Changing _parse_col_nums() and _parse_col_names() from InternalError to
EndOfFile is a semantic change that affects all callers, not just the schema
inference path. In internal_service.cpp:894, get_parsed_schema returns the
status, which is then serialized to protobuf. But if CsvReader::_parse_col_nums
is called from the read path (not just schema inference), returning EndOfFile
might be misinterpreted as "no more data" rather than "parsing error."
Verify that _parse_col_nums and _parse_col_names are only called from
get_parsed_schema() (schema inference path). If they're used elsewhere,
EndOfFile could silently swallow a real error.
The EndOfFile status will be serialized to the protobuf result. The FE side
then checks for END_OF_FILE TStatusCode. This flow looks correct, but there's a
minor concern: the BE logs LOG(WARNING) << "fetch table schema failed" for what
is actually a normal condition (empty file). This will generate spurious
warnings in production logs.
Recommendation: Add a special case before the warning:
if (st.is<END_OF_FILE>()) {
st.to_protobuf(result->mutable_status());
return;
}
Issue 5 (LOW): adjust_column_sep_positions lacks bounds check
In
[new_plain_text_line_reader.h:192-195](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h#L192-L195):
void adjust_column_sep_positions(size_t offset) {
for (auto& pos : _column_sep_positions) {
pos -= offset;
}
}
If any position is less than offset (shouldn't happen for BOM, since BOM is
only 3 bytes and positions should be >= 3), this would underflow. A DCHECK(pos
>= offset) would be appropriate defensive assertion.
Issue 6 (LOW): Commit messages are not descriptive
Commits like "f", "up", "up1", "add case", "add output file" provide no
information about the changes. Before merging, these should be squashed into a
single descriptive commit like: [fix](tvf) Skip empty files during TVF schema
inference for csv_with_names format.
Issue 7 (LOW): Test data dependency
The test references S3 files load_with_empty_{0,1}.csv but there's no
evidence these files were uploaded to the test bucket. The test will fail if
the data files don't exist. Confirm test data is uploaded.
Issue 8 (LOW): Decompression progress check change needs scrutiny
In
[new_plain_text_line_reader.cpp:477](vscode-webview://12or9qsfo5kq61724hn7thg1f7a1a8i4j2ml00u56orm57h9kk50/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp#L477):
// Old:
if ((input_read_bytes == 0 /*decompressed_len == 0*/) && _more_input_bytes
== 0 && ...
// New:
if (input_read_bytes == 0 && decompressed_len == 0 && _more_input_bytes == 0
&& ...
The old code had decompressed_len == 0 commented out, meaning it only
checked input_read_bytes == 0. The new code requires both to be 0. This is a
correct fix — it avoids false "no progress" detection when the decompressor did
produce output. But this seems like a separate bugfix (matches commit [#60852])
and should ideally be a separate PR. Including it here muddles the scope.
What looks good
Clean refactoring of getFetchTableStructureRequest into reusable helper
methods (buildFileScanRangeParams, buildFileRangeDesc, buildSchemaRequest)
Proper use of END_OF_FILE status code as a signal rather than an error
The BOM fix in enclose mode is a real correctness fix
Test covers both SELECT and DESC to verify schema inference
Recommendations
Must fix: Build TFileScanRangeParams once before the retry loop (Issue 1) —
it's doing redundant work per-candidate
Should fix: Suppress the BE warning log for END_OF_FILE in
fetch_table_schema (Issue 4)
Should do: Squash commits before merge (Issue 6)
Consider: Add DCHECK in adjust_column_sep_positions (Issue 5)
Consider: Separate the decompression fix and BOM fix into their own PRs for
cleaner review history
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]