This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eaa95144a73173b0166b10153eb06efe1aa46342
Author: Siyang Tang <[email protected]>
AuthorDate: Wed Sep 20 11:41:52 2023 +0800

    [fix](broker-load) fix file offset for compressed file #24564
    
    Co-authored-by: Kang <[email protected]>
---
 .../org/apache/doris/planner/external/FileGroupInfo.java    | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index 80fd901345..2b412da825 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -32,6 +32,7 @@ import org.apache.doris.planner.FileLoadScanNode;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileScanRange;
@@ -206,16 +207,17 @@ public class FileGroupInfo {
             // header_type
             TFileFormatType formatType = 
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
             context.params.setFormatType(formatType);
-            context.params.setCompressType(
-                    
Util.getOrInferCompressType(context.fileGroup.getCompressType(), 
fileStatus.path)
-            );
+            TFileCompressType compressType =
+                    
Util.getOrInferCompressType(context.fileGroup.getCompressType(), 
fileStatus.path);
+            context.params.setCompressType(compressType);
             List<String> columnsFromPath = 
BrokerUtil.parseColumnsFromPath(fileStatus.path,
                     context.fileGroup.getColumnNamesFromPath());
             // Assign scan range locations only for broker load.
             // stream load has only one file, and no need to set multi scan 
ranges.
             if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) 
{
                 // Now only support split plain text
-                if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
+                if (compressType == TFileCompressType.PLAIN
+                        && (formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
                         || formatType == TFileFormatType.FORMAT_JSON) {
                     long rangeBytes = bytesPerInstance - curInstanceBytes;
                     TFileRangeDesc rangeDesc = 
createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
@@ -223,10 +225,9 @@ public class FileGroupInfo {
                     
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                     curFileOffset += rangeBytes;
                 } else {
-                    TFileRangeDesc rangeDesc = 
createFileRangeDesc(curFileOffset, fileStatus, leftBytes,
+                    TFileRangeDesc rangeDesc = createFileRangeDesc(0, 
fileStatus, leftBytes,
                             columnsFromPath);
                     
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-                    curFileOffset = 0;
                     i++;
                 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to