This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit eaa95144a73173b0166b10153eb06efe1aa46342 Author: Siyang Tang <[email protected]> AuthorDate: Wed Sep 20 11:41:52 2023 +0800 [fix](broker-load) fix file offset for compressed file #24564 Co-authored-by: Kang <[email protected]> --- .../org/apache/doris/planner/external/FileGroupInfo.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index 80fd901345..2b412da825 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -32,6 +32,7 @@ import org.apache.doris.planner.FileLoadScanNode; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanRange; @@ -206,16 +207,17 @@ public class FileGroupInfo { // header_type TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); context.params.setFormatType(formatType); - context.params.setCompressType( - Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path) - ); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + context.params.setCompressType(compressType); List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { // Now only support split plain text - if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + if (compressType == TFileCompressType.PLAIN + && (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) || formatType == TFileFormatType.FORMAT_JSON) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, @@ -223,10 +225,9 @@ public class FileGroupInfo { curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes, columnsFromPath); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - curFileOffset = 0; i++; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
