morningman commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r906693567


##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -158,9 +156,10 @@ public ExternalFileScanNode(
             PlanNodeId id,
             TupleDescriptor desc,
             String planNodeName) throws MetaNotFoundException {
-        super(id, desc, planNodeName, StatisticalType.BROKER_SCAN_NODE);
 
-        this.hmsTable = (HMSExternalTable) desc.getTable();
+        super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE);
+
+        this.hmsTable = (HMSExternalTable) this.desc.getTable();
 
         DLAType type = getDLAType();

Review Comment:
   Looks like the type can be save in hmsTable, no need to check it every time.



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,55 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: required i8 column_separator;
+    4: required i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot 
and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and 
all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;
+    2: required TFileFormatType format_type;
+    // Path of this range
+    3: required string path;
+    // Offset of this file start
+    4: required i64 start_offset;
+    // Size of this range, if size = -1, this means that will read to then end 
of file
+    5: required i64 size;
+    // total size of the file
+    6: optional i64 file_size;
+
+    // columns parsed from file path should be after the columns read from file
+    7: optional list<string> columns_from_path;
 
+    8: optional THdfsParams hdfs_params;
+}
+
+// HDFS file scan range
+struct TFileScanRange {
+    1: required list<TFileRangeDesc> ranges
+    2: required TFileScanRangeParams params
+    3: required list<Types.TNetworkAddress> broker_addresses

Review Comment:
   Do we need to support reading via broker?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -265,6 +314,15 @@ struct TBrokerScanNode {
     4: optional list<Exprs.TExpr> pre_filter_exprs
 }
 
+struct TFileScanNode {
+    1: required Types.TTupleId tuple_id
+
+    // Partition info used to process partition select in broker load
+    2: optional list<Exprs.TExpr> partition_exprs
+    3: optional list<Partitions.TRangePartition> partition_infos
+    4: optional list<Exprs.TExpr> pre_filter_exprs

Review Comment:
   Do we still need fields 2,3,4?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,55 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: required i8 column_separator;

Review Comment:
   use `optional` for all fields.
   Same suggestion for all other structs.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -198,41 +197,58 @@ public void init(Analyzer analyzer) throws UserException {
 
     private void initContext(ParamCreateContext context) throws DdlException, 
MetaNotFoundException {

Review Comment:
   No need to pass the param `context`, because `context` is already a field of 
this class.



##########
be/src/vec/exec/file_arrow_scanner.h:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/array.h>
+#include <exec/arrow/arrow_reader.h>
+#include <exec/arrow/orc_reader.h>
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/base_scanner.h"
+#include "vec/exec/file_scanner.h"
+#include "util/runtime_profile.h"
+
+namespace doris::vectorized {
+
+// VArrow scanner convert the data read from orc|parquet to doris's columns.
+class FileArrowScanner : public FileScanner {

Review Comment:
   What is different between `FileArrowScanner` and `VArrowScanner` ?
   Looks like the `VArrowScanner` first read data into src block, and then 
convert to dest block?
   And `FileArrowScanner` read data directly into dest block?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -198,41 +197,58 @@ public void init(Analyzer analyzer) throws UserException {
 
     private void initContext(ParamCreateContext context) throws DdlException, 
MetaNotFoundException {
         context.srcTupleDescriptor = 
analyzer.getDescTbl().createTupleDescriptor();
-        context.slotDescByName = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-        context.params = new TBrokerScanRangeParams();
+        context.params = new TFileScanRangeParams();
         if 
(scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
             Map<String, String> serDeInfoParams = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
             String columnSeparator = 
Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
                     ? HIVE_DEFAULT_COLUMN_SEPARATOR : 
serDeInfoParams.get("field.delim");
             String lineDelimiter = 
Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
                     ? HIVE_DEFAULT_LINE_DELIMITER : 
serDeInfoParams.get("line.delim");
-            
context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
-            
context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
-            context.params.setColumnSeparatorStr(columnSeparator);
-            context.params.setLineDelimiterStr(lineDelimiter);
-            
context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
-            
context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+            TFileTextScanRangeParams textParams = new 
TFileTextScanRangeParams();
+            
textParams.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
+            
textParams.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
+            textParams.setColumnSeparatorStr(columnSeparator);
+            textParams.setLineDelimiterStr(lineDelimiter);
+            
textParams.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
+            
textParams.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+            context.params.setTextParams(textParams);
         }
 
-        Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+        
context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt());
+        // Need re compute memory layout after set some slot descriptor to 
nullable
+        context.srcTupleDescriptor.computeStatAndMemLayout();
+
+        Map<String, SlotDescriptor> slotDescByName = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
         List<Column> columns = hmsTable.getBaseSchema(false);
         for (Column column : columns) {
             SlotDescriptor slotDesc = 
analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
-            slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+            slotDesc.setType(column.getType());
             slotDesc.setIsMaterialized(true);
             slotDesc.setIsNullable(true);
-            slotDesc.setColumn(new Column(column.getName(), 
PrimitiveType.VARCHAR));
-            context.params.addToSrcSlotIds(slotDesc.getId().asInt());
+            slotDesc.setColumn(new Column(column));
             slotDescByName.put(column.getName(), slotDesc);
         }
-        context.slotDescByName = slotDescByName;
+
+        for (FieldSchema fieldSchema : 
hmsTable.getRemoteTable().getPartitionKeys()) {

Review Comment:
   Are you sure that all partition key columns are in file path, not in file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to