This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a584cc04178 [feature](paimon)support native reader for 2.0 (#29612)
a584cc04178 is described below

commit a584cc04178cc58ff8596ba072fdf7e970364323
Author: wuwenchi <[email protected]>
AuthorDate: Sun Jan 7 21:46:04 2024 +0800

    [feature](paimon)support native reader for 2.0 (#29612)
    
    bp #29339
---
 be/src/vec/exec/scan/vfile_scanner.cpp             |  17 +-
 .../java/org/apache/doris/common/FeConstants.java  |  30 +-
 .../org/apache/doris/common/util/LocationPath.java | 380 +++++++++++++++++++++
 .../property/constants/PaimonProperties.java       |   1 +
 .../planner/external/paimon/PaimonScanNode.java    |  58 +++-
 .../planner/external/paimon/PaimonSource.java      |   5 +
 .../doris/planner/external/paimon/PaimonSplit.java |  26 +-
 fe/pom.xml                                         |   4 +-
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 9 files changed, 497 insertions(+), 25 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 72ae8e2d73f..d6ba62db53a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -676,11 +676,22 @@ Status VFileScanner::_get_next_reader() {
         // JNI reader can only push down column value range
         bool push_down_predicates =
                 !_is_load && _params->format_type != 
TFileFormatType::FORMAT_JNI;
-        if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params &&
-            range.table_format_params.table_format_type == "hudi") {
-            if (range.table_format_params.hudi_params.delta_logs.empty()) {
+        if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params) {
+            if (range.table_format_params.table_format_type == "hudi" &&
+                range.table_format_params.hudi_params.delta_logs.empty()) {
                 // fall back to native reader if there is no log file
                 format_type = TFileFormatType::FORMAT_PARQUET;
+            } else if (range.table_format_params.table_format_type == "paimon" 
&&
+                       
!range.table_format_params.paimon_params.__isset.paimon_split) {
+                // use native reader
+                auto format = 
range.table_format_params.paimon_params.file_format;
+                if (format == "orc") {
+                    format_type = TFileFormatType::FORMAT_ORC;
+                } else if (format == "parquet") {
+                    format_type = TFileFormatType::FORMAT_PARQUET;
+                } else {
+                    return Status::InternalError("Not supported paimon file 
format: {}", format);
+                }
             }
         }
         bool need_to_get_parsed_schema = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 487d50283d9..3012f6a62e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -65,21 +65,21 @@ public class FeConstants {
     public static long tablet_checker_interval_ms = 20 * 1000L;
     public static long tablet_schedule_interval_ms = 1000L;
 
-    public static String FS_PREFIX_S3 = "s3";
-    public static String FS_PREFIX_S3A = "s3a";
-    public static String FS_PREFIX_S3N = "s3n";
-    public static String FS_PREFIX_OSS = "oss";
-    public static String FS_PREFIX_GCS = "gs";
-    public static String FS_PREFIX_BOS = "bos";
-    public static String FS_PREFIX_COS = "cos";
-    public static String FS_PREFIX_COSN = "cosn";
-    public static String FS_PREFIX_OBS = "obs";
-    public static String FS_PREFIX_OFS = "ofs";
-    public static String FS_PREFIX_GFS = "gfs";
-    public static String FS_PREFIX_JFS = "jfs";
-    public static String FS_PREFIX_HDFS = "hdfs";
-    public static String FS_PREFIX_VIEWFS = "viewfs";
-    public static String FS_PREFIX_FILE = "file";
+    public static final String FS_PREFIX_S3 = "s3";
+    public static final String FS_PREFIX_S3A = "s3a";
+    public static final String FS_PREFIX_S3N = "s3n";
+    public static final String FS_PREFIX_OSS = "oss";
+    public static final String FS_PREFIX_GCS = "gs";
+    public static final String FS_PREFIX_BOS = "bos";
+    public static final String FS_PREFIX_COS = "cos";
+    public static final String FS_PREFIX_COSN = "cosn";
+    public static final String FS_PREFIX_OBS = "obs";
+    public static final String FS_PREFIX_OFS = "ofs";
+    public static final String FS_PREFIX_GFS = "gfs";
+    public static final String FS_PREFIX_JFS = "jfs";
+    public static final String FS_PREFIX_HDFS = "hdfs";
+    public static final String FS_PREFIX_VIEWFS = "viewfs";
+    public static final String FS_PREFIX_FILE = "file";
 
     public static final String INTERNAL_DB_NAME = "__internal_schema";
     public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = 
"internal_tmp_materialized_view_";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
new file mode 100644
index 00000000000..d56e67bb0d1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.property.constants.CosProperties;
+import org.apache.doris.datasource.property.constants.ObsProperties;
+import org.apache.doris.datasource.property.constants.OssProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.fs.FileSystemType;
+import org.apache.doris.thrift.TFileType;
+
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocationPath {
+    private static final Logger LOG = LogManager.getLogger(LocationPath.class);
+    private static final String SCHEME_DELIM = "://";
+    private static final String NONSTANDARD_SCHEME_DELIM = ":/";
+    private final LocationType locationType;
+    private final String location;
+
+    enum LocationType {
+        HDFS,
+        LOCAL, // Local File
+        BOS, // Baidu
+        GCS, // Google,
+        OBS, // Huawei,
+        COS, // Tencent
+        COSN, // Tencent
+        OFS, // Tencent CHDFS
+        GFS, // Tencent GooseFs,
+        OSS, // Alibaba,
+        OSS_HDFS, // JindoFS on OSS
+        JFS, // JuiceFS,
+        S3,
+        S3A,
+        S3N,
+        VIEWFS,
+        UNKNOWN
+    }
+
+    private LocationPath(String location) {
+        this(location, new HashMap<>());
+    }
+
+    public LocationPath(String location, Map<String, String> props) {
+        String scheme = parseScheme(location).toLowerCase();
+        switch (scheme) {
+            case FeConstants.FS_PREFIX_HDFS:
+                locationType = LocationType.HDFS;
+                // Need add hdfs host to location
+                String host = props.get(HdfsResource.DSF_NAMESERVICES);
+                this.location = normalizedHdfsPath(location, host);
+                break;
+            case FeConstants.FS_PREFIX_S3:
+                locationType = LocationType.S3;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_S3A:
+                locationType = LocationType.S3A;
+                this.location = convertToS3(location);
+                break;
+            case FeConstants.FS_PREFIX_S3N:
+                // include the check for multi locations and in a table, such 
as both s3 and hdfs are in a table.
+                locationType = LocationType.S3N;
+                this.location = convertToS3(location);
+                break;
+            case FeConstants.FS_PREFIX_BOS:
+                locationType = LocationType.BOS;
+                // use s3 client to access
+                this.location = convertToS3(location);
+                break;
+            case FeConstants.FS_PREFIX_GCS:
+                locationType = LocationType.GCS;
+                // use s3 client to access
+                this.location = convertToS3(location);
+                break;
+            case FeConstants.FS_PREFIX_OSS:
+                if (isHdfsOnOssEndpoint(location)) {
+                    locationType = LocationType.OSS_HDFS;
+                    this.location = location;
+                } else {
+                    if (useS3EndPoint(props)) {
+                        this.location = convertToS3(location);
+                    } else {
+                        this.location = location;
+                    }
+                    locationType = LocationType.OSS;
+                }
+                break;
+            case FeConstants.FS_PREFIX_COS:
+                if (useS3EndPoint(props)) {
+                    this.location = convertToS3(location);
+                } else {
+                    this.location = location;
+                }
+                locationType = LocationType.COS;
+                break;
+            case FeConstants.FS_PREFIX_OBS:
+                if (useS3EndPoint(props)) {
+                    this.location = convertToS3(location);
+                } else {
+                    this.location = location;
+                }
+                locationType = LocationType.OBS;
+                break;
+            case FeConstants.FS_PREFIX_OFS:
+                locationType = LocationType.OFS;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_JFS:
+                locationType = LocationType.JFS;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_GFS:
+                locationType = LocationType.GFS;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_COSN:
+                // if treat cosn(tencent hadoop-cos) as a s3 file system, may 
bring incompatible issues
+                locationType = LocationType.COSN;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_VIEWFS:
+                locationType = LocationType.VIEWFS;
+                this.location = location;
+                break;
+            case FeConstants.FS_PREFIX_FILE:
+                locationType = LocationType.LOCAL;
+                this.location = location;
+                break;
+            default:
+                locationType = LocationType.UNKNOWN;
+                this.location = location;
+        }
+    }
+
+    private static String parseScheme(String location) {
+        String[] schemeSplit = location.split(SCHEME_DELIM);
+        if (schemeSplit.length > 1) {
+            return schemeSplit[0];
+        } else {
+            schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
+            if (schemeSplit.length > 1) {
+                return schemeSplit[0];
+            }
+            throw new IllegalArgumentException("Fail to parse scheme, invalid 
location: " + location);
+        }
+    }
+
+    private boolean useS3EndPoint(Map<String, String> props) {
+        if (props.containsKey(ObsProperties.ENDPOINT)
+                || props.containsKey(OssProperties.ENDPOINT)
+                || props.containsKey(CosProperties.ENDPOINT)) {
+            return false;
+        }
+        // wide check range for the compatibility of s3 properties
+        return (props.containsKey(S3Properties.ENDPOINT) || 
props.containsKey(S3Properties.Env.ENDPOINT));
+    }
+
+    public static boolean isHdfsOnOssEndpoint(String location) {
+        // example: cn-shanghai.oss-dls.aliyuncs.com contains the 
"oss-dls.aliyuncs".
+        // 
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
+        return location.contains("oss-dls.aliyuncs");
+    }
+
+    /**
+     * The converted path is used for FE to get metadata
+     * @param location origin location
+     * @return metadata location path. just convert when storage is compatible 
with s3 client.
+     */
+    private static String convertToS3(String location) {
+        LOG.debug("try convert location to s3 prefix: " + location);
+        int pos = findDomainPos(location);
+        return "s3" + location.substring(pos);
+    }
+
+    private static int findDomainPos(String rangeLocation) {
+        int pos = rangeLocation.indexOf("://");
+        if (pos == -1) {
+            throw new RuntimeException("No '://' found in location: " + 
rangeLocation);
+        }
+        return pos;
+    }
+
+    private static String normalizedHdfsPath(String location, String host) {
+        try {
+            // Hive partition may contain special characters such as ' ', '<', 
'>' and so on.
+            // Need to encode these characters before creating URI.
+            // But doesn't encode '/' and ':' so that we can get the correct 
uri host.
+            location = URLEncoder.encode(location, 
StandardCharsets.UTF_8.name())
+                    .replace("%2F", "/").replace("%3A", ":");
+            URI normalizedUri = new URI(location);
+            // compatible with 'hdfs:///' or 'hdfs:/'
+            if (StringUtils.isEmpty(normalizedUri.getHost())) {
+                location = URLDecoder.decode(location, 
StandardCharsets.UTF_8.name());
+                String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
+                String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
+                if (location.startsWith(brokenPrefix) && 
!location.startsWith(normalizedPrefix)) {
+                    location = location.replace(brokenPrefix, 
normalizedPrefix);
+                }
+                if (StringUtils.isNotEmpty(host)) {
+                    // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
+                    // Or hdfs:///abc to hdfs://name_service/abc
+                    return location.replace(normalizedPrefix, normalizedPrefix 
+ host + "/");
+                } else {
+                    // 'hdfs://null/' equals the 'hdfs:///'
+                    if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) 
{
+                        // Do not support hdfs:///location
+                        throw new RuntimeException("Invalid location with 
empty host: " + location);
+                    } else {
+                        // Replace 'hdfs://key/' to '/key/', try access local 
NameNode on BE.
+                        return location.replace(normalizedPrefix, "/");
+                    }
+                }
+            }
+            return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
+        } catch (URISyntaxException | UnsupportedEncodingException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static Pair<FileSystemType, String> getFSIdentity(String location, 
String bindBrokerName) {
+        LocationPath locationPath = new LocationPath(location);
+        FileSystemType fsType = (bindBrokerName != null) ? 
FileSystemType.BROKER : locationPath.getFileSystemType();
+        URI uri = locationPath.getPath().toUri();
+        String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + 
Strings.nullToEmpty(uri.getAuthority());
+        return Pair.of(fsType, fsIdent);
+    }
+
+    private FileSystemType getFileSystemType() {
+        FileSystemType fsType;
+        switch (locationType) {
+            case S3:
+            case S3A:
+            case S3N:
+            case COS:
+            case OSS:
+            case OBS:
+            case BOS:
+            case GCS:
+                // All storage will use s3 client to access on BE, so need 
convert to s3
+                fsType = FileSystemType.S3;
+                break;
+            case COSN:
+            case OFS:
+                // ofs:// and cosn:// use the same underlying file system: 
Tencent Cloud HDFS, aka CHDFS)) {
+                fsType = FileSystemType.OFS;
+                break;
+            case HDFS:
+            case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib 
to access oss.
+            case VIEWFS:
+            case GFS:
+                fsType = FileSystemType.DFS;
+                break;
+            case JFS:
+                fsType = FileSystemType.JFS;
+                break;
+            case LOCAL:
+                fsType = FileSystemType.FILE;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown file system 
for location: " + location);
+        }
+        return fsType;
+    }
+
+    /**
+     * provide file type for BE.
+     * @param location the location is from fs.listFile
+     * @return on BE, we will use TFileType to get the suitable client to 
access storage.
+     */
+    public static TFileType getTFileType(String location) {
+        if (location == null || location.isEmpty()) {
+            return null;
+        }
+        LocationPath locationPath = new LocationPath(location);
+        switch (locationPath.getLocationType()) {
+            case S3:
+            case S3A:
+            case S3N:
+            case COS:
+            case OSS:
+            case OBS:
+            case BOS:
+            case GCS:
+                // now we only support S3 client for object storage on BE
+                return TFileType.FILE_S3;
+            case HDFS:
+            case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib 
to access oss.
+            case VIEWFS:
+            case COSN:
+                return TFileType.FILE_HDFS;
+            case GFS:
+            case JFS:
+            case OFS:
+                return TFileType.FILE_BROKER;
+            case LOCAL:
+                return TFileType.FILE_LOCAL;
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * The converted path is used for BE
+     * @return BE scan range path
+     */
+    public Path toScanRangeLocation() {
+        switch (locationType) {
+            case S3:
+            case S3A:
+            case S3N:
+            case COS:
+            case OSS:
+            case OBS:
+            case BOS:
+            case GCS:
+                // All storage will use s3 client to access on BE, so need 
convert to s3
+                return new Path(convertToS3(location));
+            case HDFS:
+            case OSS_HDFS:
+            case VIEWFS:
+            case COSN:
+            case GFS:
+            case JFS:
+            case OFS:
+            case LOCAL:
+            default:
+                return getPath();
+        }
+    }
+
+    public LocationType getLocationType() {
+        return locationType;
+    }
+
+    public String get() {
+        return location;
+    }
+
+    public Path getPath() {
+        return new Path(location);
+    }
+
+    @Override
+    public String toString() {
+        return get();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 06d205ff221..318e2bac30a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 public class PaimonProperties {
     public static final String WAREHOUSE = "warehouse";
+    public static final String FILE_FORMAT = "file.format";
     public static final String PAIMON_PREFIX = "paimon";
     public static final String PAIMON_CATALOG_TYPE = "metastore";
     public static final String HIVE_METASTORE_URIS = "uri";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index 4ba87c878e4..678a0d77197 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
@@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
 import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.utils.InstantiationUtil;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -102,6 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
         TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+        org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+        if (split != null) {
+            fileDesc.setPaimonSplit(encodeObjectToString(split));
+        }
+        fileDesc.setFileFormat(source.getFileFormat());
         fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
         fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
         
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> 
slot.getColumn().getName())
@@ -127,13 +138,52 @@ public class PaimonScanNode extends FileQueryScanNode {
         List<org.apache.paimon.table.source.Split> paimonSplits = 
readBuilder.withFilter(predicates)
                 .withProjection(projected)
                 .newScan().plan().splits();
+        boolean supportNative = supportNativeReader();
         for (org.apache.paimon.table.source.Split split : paimonSplits) {
-            PaimonSplit paimonSplit = new PaimonSplit(split);
-            splits.add(paimonSplit);
+            if (supportNative && split instanceof DataSplit) {
+                DataSplit dataSplit = (DataSplit) split;
+                Optional<List<RawFile>> optRowFiles = 
dataSplit.convertToRawFiles();
+                if (optRowFiles.isPresent()) {
+                    List<RawFile> rawFiles = optRowFiles.get();
+                    for (RawFile file : rawFiles) {
+                        LocationPath locationPath = new 
LocationPath(file.path(), source.getCatalog().getProperties());
+                        Path finalDataFilePath = 
locationPath.toScanRangeLocation();
+                        try {
+                            splits.addAll(
+                                    splitFile(
+                                        finalDataFilePath,
+                                        0,
+                                        null,
+                                        file.length(),
+                                        -1,
+                                        true,
+                                        null,
+                                        
PaimonSplit.PaimonSplitCreator.DEFAULT));
+                        } catch (IOException e) {
+                            throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
+                        }
+                    }
+                } else {
+                    splits.add(new PaimonSplit(split));
+                }
+            } else {
+                splits.add(new PaimonSplit(split));
+            }
         }
         return splits;
     }
 
+    private boolean supportNativeReader() {
+        String fileFormat = source.getFileFormat().toLowerCase();
+        switch (fileFormat) {
+            case "orc":
+            case "parquet":
+                return true;
+            default:
+                return false;
+        }
+    }
+
     //When calling 'setPaimonParams' and 'getSplits', the column trimming has 
not been performed yet,
     // Therefore, paimon_column_names is temporarily reset here
     @Override
@@ -157,8 +207,8 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public TFileType getLocationType(String location) throws DdlException, 
MetaNotFoundException {
-        //todo: no use
-        return TFileType.FILE_S3;
+        return 
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
+            new DdlException("Unknown file location " + location + " for 
paimon table "));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
index 2f55e30c086..fa838350c82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileAttributes;
 
@@ -61,4 +62,8 @@ public class PaimonSource {
     public ExternalCatalog getCatalog() {
         return paimonExtTable.getCatalog();
     }
+
+    public String getFileFormat() {
+        return 
originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
index 8ecf539db91..13263fb5842 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -18,21 +18,30 @@
 package org.apache.doris.planner.external.paimon;
 
 import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.SplitCreator;
 import org.apache.doris.planner.external.TableFormatType;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.paimon.table.source.Split;
 
+import java.util.List;
+
 public class PaimonSplit extends FileSplit {
     private Split split;
     private TableFormatType tableFormatType;
 
     public PaimonSplit(Split split) {
-        super(new Path("dummyPath"), 0, 0, 0, null, null);
+        super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
         this.split = split;
         this.tableFormatType = TableFormatType.PAIMON;
     }
 
+    public PaimonSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
+                       List<String> partitionList) {
+        super(file, start, length, fileLength, hosts, partitionList);
+        this.tableFormatType = TableFormatType.PAIMON;
+    }
+
     public Split getSplit() {
         return split;
     }
@@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit {
         this.tableFormatType = tableFormatType;
     }
 
+    public static class PaimonSplitCreator implements SplitCreator {
+
+        static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
+
+        @Override
+        public org.apache.doris.spi.Split create(Path path,
+                                                 long start,
+                                                 long length,
+                                                 long fileLength,
+                                                 long modificationTime,
+                                                 String[] hosts,
+                                                 List<String> partitionValues) 
{
+            return new PaimonSplit(path, start, length, fileLength, hosts, 
partitionValues);
+        }
+    }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 4f4de40a670..ad126d7111e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -221,7 +221,7 @@ under the License.
         <doris.home>${fe.dir}/../</doris.home>
         <revision>1.2-SNAPSHOT</revision>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        
<doris.hive.catalog.shade.version>1.0.2</doris.hive.catalog.shade.version>
+        
<doris.hive.catalog.shade.version>1.0.3</doris.hive.catalog.shade.version>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <!--plugin parameters-->
@@ -342,7 +342,7 @@ under the License.
         <!--todo waiting release-->
         <quartz.version>2.3.2</quartz.version>
         <!-- paimon -->
-        <paimon.version>0.5.0-incubating</paimon.version>
+        <paimon.version>0.6.0-incubating</paimon.version>
         <disruptor.version>3.4.4</disruptor.version>
         <trino.parser.version>395</trino.parser.version>
         <!-- arrow flight sql -->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index a57745e78d3..506dd6e3f4f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -308,6 +308,7 @@ struct TPaimonFileDesc {
     8: optional i64 db_id
     9: optional i64 tbl_id
     10: optional i64 last_update_time
+    11: optional string file_format
 }
 
 struct TMaxComputeFileDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to