This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bfe65565d80 [feature](paimon)support native reader (#29339)
bfe65565d80 is described below

commit bfe65565d80c8dd3bf8d41a8e3656cfaadaa665a
Author: wuwenchi <[email protected]>
AuthorDate: Thu Jan 4 14:31:48 2024 +0800

    [feature](paimon)support native reader (#29339)
    
    Support native reader fro paimon.
    
    Upgrade paimon 0.5 to 0.6 : apache/doris-shade#32
---
 be/src/vec/exec/scan/vfile_scanner.cpp             | 17 +++++--
 .../property/constants/PaimonProperties.java       |  1 +
 .../planner/external/paimon/PaimonScanNode.java    | 59 ++++++++++++++++++++--
 .../planner/external/paimon/PaimonSource.java      |  5 ++
 .../doris/planner/external/paimon/PaimonSplit.java | 26 +++++++++-
 fe/pom.xml                                         |  4 +-
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 7 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6002f4eea67..c70295e1313 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -736,11 +736,22 @@ Status VFileScanner::_get_next_reader() {
         // JNI reader can only push down column value range
         bool push_down_predicates =
                 !_is_load && _params->format_type != 
TFileFormatType::FORMAT_JNI;
-        if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params &&
-            range.table_format_params.table_format_type == "hudi") {
-            if (range.table_format_params.hudi_params.delta_logs.empty()) {
+        if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params) {
+            if (range.table_format_params.table_format_type == "hudi" &&
+                range.table_format_params.hudi_params.delta_logs.empty()) {
                 // fall back to native reader if there is no log file
                 format_type = TFileFormatType::FORMAT_PARQUET;
+            } else if (range.table_format_params.table_format_type == "paimon" 
&&
+                       
!range.table_format_params.paimon_params.__isset.paimon_split) {
+                // use native reader
+                auto format = 
range.table_format_params.paimon_params.file_format;
+                if (format == "orc") {
+                    format_type = TFileFormatType::FORMAT_ORC;
+                } else if (format == "parquet") {
+                    format_type = TFileFormatType::FORMAT_PARQUET;
+                } else {
+                    return Status::InternalError("Not supported paimon file 
format: {}", format);
+                }
             }
         }
         bool need_to_get_parsed_schema = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 06d205ff221..318e2bac30a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 public class PaimonProperties {
     public static final String WAREHOUSE = "warehouse";
+    public static final String FILE_FORMAT = "file.format";
     public static final String PAIMON_PREFIX = "paimon";
     public static final String PAIMON_CATALOG_TYPE = "metastore";
     public static final String HIVE_METASTORE_URIS = "uri";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index 1e23d2f781d..cf822901992 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
@@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.utils.InstantiationUtil;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -102,7 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
         TPaimonFileDesc fileDesc = new TPaimonFileDesc();
-        fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
+        org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+        if (split != null) {
+            fileDesc.setPaimonSplit(encodeObjectToString(split));
+        }
+        fileDesc.setFileFormat(source.getFileFormat());
         fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
         
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> 
slot.getColumn().getName())
                 .collect(Collectors.joining(",")));
@@ -127,13 +137,52 @@ public class PaimonScanNode extends FileQueryScanNode {
         List<org.apache.paimon.table.source.Split> paimonSplits = 
readBuilder.withFilter(predicates)
                 .withProjection(projected)
                 .newScan().plan().splits();
+        boolean supportNative = supportNativeReader();
         for (org.apache.paimon.table.source.Split split : paimonSplits) {
-            PaimonSplit paimonSplit = new PaimonSplit(split);
-            splits.add(paimonSplit);
+            if (supportNative && split instanceof DataSplit) {
+                DataSplit dataSplit = (DataSplit) split;
+                Optional<List<RawFile>> optRowFiles = 
dataSplit.convertToRawFiles();
+                if (optRowFiles.isPresent()) {
+                    List<RawFile> rawFiles = optRowFiles.get();
+                    for (RawFile file : rawFiles) {
+                        LocationPath locationPath = new 
LocationPath(file.path(), source.getCatalog().getProperties());
+                        Path finalDataFilePath = 
locationPath.toScanRangeLocation();
+                        try {
+                            splits.addAll(
+                                    splitFile(
+                                        finalDataFilePath,
+                                        0,
+                                        null,
+                                        file.length(),
+                                        -1,
+                                        true,
+                                        null,
+                                        
PaimonSplit.PaimonSplitCreator.DEFAULT));
+                        } catch (IOException e) {
+                            throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
+                        }
+                    }
+                } else {
+                    splits.add(new PaimonSplit(split));
+                }
+            } else {
+                splits.add(new PaimonSplit(split));
+            }
         }
         return splits;
     }
 
+    private boolean supportNativeReader() {
+        String fileFormat = source.getFileFormat().toLowerCase();
+        switch (fileFormat) {
+            case "orc":
+            case "parquet":
+                return true;
+            default:
+                return false;
+        }
+    }
+
     //When calling 'setPaimonParams' and 'getSplits', the column trimming has 
not been performed yet,
     // Therefore, paimon_column_names is temporarily reset here
     @Override
@@ -157,8 +206,8 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public TFileType getLocationType(String location) throws DdlException, 
MetaNotFoundException {
-        //todo: no use
-        return TFileType.FILE_S3;
+        return 
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
+            new DdlException("Unknown file location " + location + " for 
paimon table "));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
index 2f55e30c086..fa838350c82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileAttributes;
 
@@ -61,4 +62,8 @@ public class PaimonSource {
     public ExternalCatalog getCatalog() {
         return paimonExtTable.getCatalog();
     }
+
+    public String getFileFormat() {
+        return 
originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
index 8ecf539db91..13263fb5842 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -18,21 +18,30 @@
 package org.apache.doris.planner.external.paimon;
 
 import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.SplitCreator;
 import org.apache.doris.planner.external.TableFormatType;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.paimon.table.source.Split;
 
+import java.util.List;
+
 public class PaimonSplit extends FileSplit {
     private Split split;
     private TableFormatType tableFormatType;
 
     public PaimonSplit(Split split) {
-        super(new Path("dummyPath"), 0, 0, 0, null, null);
+        super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
         this.split = split;
         this.tableFormatType = TableFormatType.PAIMON;
     }
 
+    public PaimonSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
+                       List<String> partitionList) {
+        super(file, start, length, fileLength, hosts, partitionList);
+        this.tableFormatType = TableFormatType.PAIMON;
+    }
+
     public Split getSplit() {
         return split;
     }
@@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit {
         this.tableFormatType = tableFormatType;
     }
 
+    public static class PaimonSplitCreator implements SplitCreator {
+
+        static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
+
+        @Override
+        public org.apache.doris.spi.Split create(Path path,
+                                                 long start,
+                                                 long length,
+                                                 long fileLength,
+                                                 long modificationTime,
+                                                 String[] hosts,
+                                                 List<String> partitionValues) 
{
+            return new PaimonSplit(path, start, length, fileLength, hosts, 
partitionValues);
+        }
+    }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index ef0747931cf..dbb691ddf2d 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -221,7 +221,7 @@ under the License.
         <doris.home>${fe.dir}/../</doris.home>
         <revision>1.2-SNAPSHOT</revision>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        
<doris.hive.catalog.shade.version>1.0.2</doris.hive.catalog.shade.version>
+        
<doris.hive.catalog.shade.version>1.0.3</doris.hive.catalog.shade.version>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <!--plugin parameters-->
@@ -343,7 +343,7 @@ under the License.
         <!--todo waiting release-->
         <quartz.version>2.3.2</quartz.version>
         <!-- paimon -->
-        <paimon.version>0.5.0-incubating</paimon.version>
+        <paimon.version>0.6.0-incubating</paimon.version>
         <disruptor.version>3.4.4</disruptor.version>
         <trino.parser.version>395</trino.parser.version>
         <!-- arrow flight sql -->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0e35f6ebc59..23da0f23ebb 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -311,6 +311,7 @@ struct TPaimonFileDesc {
     8: optional i64 db_id
     9: optional i64 tbl_id
     10: optional i64 last_update_time
+    11: optional string file_format
 }
 
 struct TMaxComputeFileDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to