This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 74ddc0d61d4 [improvement](fe and broker) support specify broker to 
getSplits, check isSplitable, file scan for HMS Multi-catalog (#24830) (#27236)
74ddc0d61d4 is described below

commit 74ddc0d61d4e82c743e8ad6d45c9b4de9c61e806
Author: DuRipeng <[email protected]>
AuthorDate: Sun Nov 19 22:20:47 2023 +0800

    [improvement](fe and broker) support specify broker to getSplits, check 
isSplitable, file scan for HMS Multi-catalog (#24830) (#27236)
    
    bp #24830
---
 .../apache/doris/datasource/ExternalCatalog.java   |   8 ++
 .../doris/datasource/HMSExternalCatalog.java       |   2 +
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  54 ++++++----
 .../apache/doris/external/hive/util/HiveUtil.java  |  12 ++-
 .../java/org/apache/doris/fs/FileSystemCache.java  |  17 +++-
 .../org/apache/doris/fs/FileSystemFactory.java     |  11 +-
 .../java/org/apache/doris/fs/FileSystemType.java   |   1 +
 .../apache/doris/fs/remote/BrokerFileSystem.java   |  87 ++++++++++++++++
 .../doris/planner/external/HiveScanNode.java       |  25 +++--
 .../doris/statistics/util/StatisticsUtil.java      |   3 +-
 fs_brokers/apache_hdfs_broker/pom.xml              |  26 ++++-
 .../doris/broker/hdfs/FileSystemManager.java       |  55 ++++++++--
 .../doris/broker/hdfs/HDFSBrokerServiceImpl.java   |  44 ++++++++
 .../java/org/apache/doris/common/HiveUtils.java    | 112 +++++++++++++++++++++
 gensrc/thrift/PaloBrokerService.thrift             |  20 ++++
 15 files changed, 431 insertions(+), 46 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 27bd524c1eb..1ed6bd027aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -592,6 +592,14 @@ public abstract class ExternalCatalog
         return ret;
     }
 
+    public String bindBrokerName() {
+        Map<String, String> properties = catalogProperty.getProperties();
+        if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+            return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
+        }
+        return null;
+    }
+
     @Override
     public Collection<DatabaseIf> getAllDbs() {
         makeSureInitialized();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 47562ebb1f1..6e3543dfcce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
     private long lastSyncedEventId = -1L;
     public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
     public static final String FILE_META_CACHE_TTL_SECOND = 
"file.meta.cache.ttl-second";
+    // broker name for file split and query scan.
+    public static final String BIND_BROKER_NAME = "broker.name";
     private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = 
"ipc.client.fallback-to-simple-auth-allowed";
 
     // -1 means file cache no ttl set
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 850619cb246..bac891eb920 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -353,11 +353,13 @@ public class HiveMetaStoreCache {
     // Get File Status by using FileSystem API.
     private FileCacheValue getFileCache(String location, InputFormat<?, ?> 
inputFormat,
                                         JobConf jobConf,
-                                        List<String> partitionValues) throws 
UserException {
+                                        List<String> partitionValues,
+                                        String bindBrokerName) throws 
UserException {
         FileCacheValue result = new FileCacheValue();
-        result.setSplittable(HiveUtil.isSplittable(inputFormat, new 
Path(location), jobConf));
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                new 
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), 
jobConf));
+                new 
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
+                    location, bindBrokerName), jobConf, bindBrokerName));
+        result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, 
jobConf));
         try {
             // For Tez engine, it may generate subdirectoies for "union" query.
             // So there may be files and directories in the table directory at 
the same time. eg:
@@ -419,7 +421,8 @@ public class HiveMetaStoreCache {
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    result = getFileCache(finalLocation, inputFormat, jobConf, 
key.getPartitionValues());
+                    result = getFileCache(finalLocation, inputFormat, jobConf,
+                        key.getPartitionValues(), key.bindBrokerName);
                 } else {
                     InputSplit[] splits;
                     String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -498,23 +501,23 @@ public class HiveMetaStoreCache {
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithCache(List<HivePartition> partitions,
-            boolean useSelfSplitter) {
-        return getFilesByPartitions(partitions, useSelfSplitter, true);
+            boolean useSelfSplitter, String bindBrokerName) {
+        return getFilesByPartitions(partitions, useSelfSplitter, true, 
bindBrokerName);
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
-            boolean useSelfSplitter) {
-        return getFilesByPartitions(partitions, useSelfSplitter, false);
+            boolean useSelfSplitter, String bindBrokerName) {
+        return getFilesByPartitions(partitions, useSelfSplitter, false, 
bindBrokerName);
     }
 
     private List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions,
-            boolean useSelfSplitter, boolean withCache) {
+            boolean useSelfSplitter, boolean withCache, String bindBrokerName) 
{
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = partitions.stream().map(p -> {
             FileCacheKey fileCacheKey = p.isDummyPartition()
                     ? FileCacheKey.createDummyCacheKey(p.getDbName(), 
p.getTblName(), p.getPath(),
-                    p.getInputFormat(), useSelfSplitter)
-                    : new FileCacheKey(p.getPath(), p.getInputFormat(), 
p.getPartitionValues());
+                    p.getInputFormat(), useSelfSplitter, bindBrokerName)
+                    : new FileCacheKey(p.getPath(), p.getInputFormat(), 
p.getPartitionValues(), bindBrokerName);
             fileCacheKey.setUseSelfSplitter(useSelfSplitter);
             return fileCacheKey;
         }).collect(Collectors.toList());
@@ -592,7 +595,7 @@ public class HiveMetaStoreCache {
                 HivePartition partition = partitionCache.getIfPresent(partKey);
                 if (partition != null) {
                     fileCacheRef.get().invalidate(new 
FileCacheKey(partition.getPath(),
-                            null, partition.getPartitionValues()));
+                            null, partition.getPartitionValues(), null));
                     partitionCache.invalidate(partKey);
                 }
             }
@@ -610,7 +613,7 @@ public class HiveMetaStoreCache {
              * and FE will exit if some network problems occur.
              * */
             FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
-                    dbName, tblName, null, null, false);
+                    dbName, tblName, null, null, false, null);
             fileCacheRef.get().invalidate(fileCacheKey);
         }
     }
@@ -625,7 +628,7 @@ public class HiveMetaStoreCache {
             HivePartition partition = partitionCache.getIfPresent(partKey);
             if (partition != null) {
                 fileCacheRef.get().invalidate(new 
FileCacheKey(partition.getPath(),
-                        null, partition.getPartitionValues()));
+                        null, partition.getPartitionValues(), null));
                 partitionCache.invalidate(partKey);
             }
         }
@@ -771,7 +774,7 @@ public class HiveMetaStoreCache {
     }
 
     public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, ValidWriteIdList validWriteIds,
-            boolean isFullAcid, long tableId) {
+            boolean isFullAcid, long tableId, String bindBrokerName) {
         List<FileCacheValue> fileCacheValues = Lists.newArrayList();
         String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
         try {
@@ -802,7 +805,8 @@ public class HiveMetaStoreCache {
                     String acidVersionPath = new Path(baseOrDeltaPath, 
"_orc_acid_version").toUri().toString();
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
-                                    
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
+                                    
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
+                                            bindBrokerName), jobConf, 
bindBrokerName));
                     Status status = fs.exists(acidVersionPath);
                     if (status != Status.OK) {
                         if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -823,7 +827,9 @@ public class HiveMetaStoreCache {
                 for (AcidUtils.ParsedDelta delta : 
directory.getCurrentDirectories()) {
                     String location = delta.getPath().toString();
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new 
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), 
jobConf));
+                            new FileSystemCache.FileSystemCacheKey(
+                                    FileSystemFactory.getFSIdentity(location, 
bindBrokerName),
+                                            jobConf, bindBrokerName));
                     RemoteFiles locatedFiles = fs.listLocatedFiles(location, 
true, false);
                     if (delta.isDeleteDelta()) {
                         List<String> deleteDeltaFileNames = 
locatedFiles.files().stream().map(f -> f.getName()).filter(
@@ -841,7 +847,9 @@ public class HiveMetaStoreCache {
                 if (directory.getBaseDirectory() != null) {
                     String location = directory.getBaseDirectory().toString();
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new 
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), 
jobConf));
+                            new FileSystemCache.FileSystemCacheKey(
+                                    FileSystemFactory.getFSIdentity(location, 
bindBrokerName),
+                                            jobConf, bindBrokerName));
                     RemoteFiles locatedFiles = fs.listLocatedFiles(location, 
true, false);
                     locatedFiles.files().stream().filter(
                             f -> 
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
@@ -939,6 +947,8 @@ public class HiveMetaStoreCache {
         private String location;
         // not in key
         private String inputFormat;
+        // Broker name for file split and file scan.
+        private String bindBrokerName;
         // Temp variable, use self file splitter or use InputFormat.getSplits.
         // Will remove after self splitter is stable.
         private boolean useSelfSplitter;
@@ -947,16 +957,18 @@ public class HiveMetaStoreCache {
         // partitionValues would be ["part1", "part2"]
         protected List<String> partitionValues;
 
-        public FileCacheKey(String location, String inputFormat, List<String> 
partitionValues) {
+        public FileCacheKey(String location, String inputFormat, List<String> 
partitionValues, String bindBrokerName) {
             this.location = location;
             this.inputFormat = inputFormat;
             this.partitionValues = partitionValues == null ? 
Lists.newArrayList() : partitionValues;
             this.useSelfSplitter = true;
+            this.bindBrokerName = bindBrokerName;
         }
 
         public static FileCacheKey createDummyCacheKey(String dbName, String 
tblName, String location,
-                                                       String inputFormat, 
boolean useSelfSplitter) {
-            FileCacheKey fileCacheKey = new FileCacheKey(location, 
inputFormat, null);
+                                                       String inputFormat, 
boolean useSelfSplitter,
+                                                       String bindBrokerName) {
+            FileCacheKey fileCacheKey = new FileCacheKey(location, 
inputFormat, null, bindBrokerName);
             fileCacheKey.dummyKey = dbName + "." + tblName;
             fileCacheKey.useSelfSplitter = useSelfSplitter;
             return fileCacheKey;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index 4bf01910f82..deb048b5943 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.BrokerFileSystem;
+import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -188,12 +190,17 @@ public final class HiveUtil {
         }
     }
 
-    public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path 
path, JobConf jobConf) {
+    public static boolean isSplittable(RemoteFileSystem remoteFileSystem, 
InputFormat<?, ?> inputFormat,
+                                       String location, JobConf jobConf) 
throws UserException {
+        if (remoteFileSystem instanceof BrokerFileSystem) {
+            return ((BrokerFileSystem) remoteFileSystem)
+                    .isSplittable(location, 
inputFormat.getClass().getCanonicalName());
+        }
+
         // ORC uses a custom InputFormat but is always splittable
         if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
             return true;
         }
-
         // use reflection to get isSplitable method on FileInputFormat
         // ATTN: the method name is actually "isSplitable", but the right 
spell is "isSplittable"
         Method method = null;
@@ -209,6 +216,7 @@ public final class HiveUtil {
         if (method == null) {
             return false;
         }
+        Path path = new Path(location);
         try {
             method.setAccessible(true);
             return (boolean) method.invoke(inputFormat, 
FileSystemFactory.getNativeByPath(path, jobConf), path);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index edc746ebe24..7946dd5e8a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -53,7 +53,7 @@ public class FileSystemCache {
     }
 
     private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
-        return FileSystemFactory.getByType(key.type, key.conf);
+        return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, 
key.bindBrokerName);
     }
 
     public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -69,11 +69,13 @@ public class FileSystemCache {
         // eg: hdfs://nameservices1
         private final String fsIdent;
         private final JobConf conf;
+        private final String bindBrokerName;
 
-        public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf 
conf) {
+        public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf 
conf, String bindBrokerName) {
             this.type = fs.first;
             this.fsIdent = fs.second;
             this.conf = conf;
+            this.bindBrokerName = bindBrokerName;
         }
 
         @Override
@@ -84,14 +86,21 @@ public class FileSystemCache {
             if (!(obj instanceof FileSystemCacheKey)) {
                 return false;
             }
-            return type.equals(((FileSystemCacheKey) obj).type)
+            boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) 
obj).type)
                     && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
                     && conf == ((FileSystemCacheKey) obj).conf;
+            if (bindBrokerName == null) {
+                return equalsWithoutBroker;
+            }
+            return equalsWithoutBroker && 
bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(conf, fsIdent, type);
+            if (bindBrokerName == null) {
+                return Objects.hash(conf, fsIdent, type);
+            }
+            return Objects.hash(conf, fsIdent, type, bindBrokerName);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index 3837a7eb95b..e54a73bbff3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -56,9 +56,11 @@ public class FileSystemFactory {
         }
     }
 
-    public static Pair<FileSystemType, String> getFSIdentity(String location) {
+    public static Pair<FileSystemType, String> getFSIdentity(String location, 
String bindBrokerName) {
         FileSystemType fsType;
-        if (S3Util.isObjStorage(location)) {
+        if (bindBrokerName != null) {
+            fsType = FileSystemType.BROKER;
+        } else if (S3Util.isObjStorage(location)) {
             if (S3Util.isHdfsOnOssEndpoint(location)) {
                 // if hdfs service is enabled on oss, use hdfs lib to access 
oss.
                 fsType = FileSystemType.DFS;
@@ -83,7 +85,8 @@ public class FileSystemFactory {
         return Pair.of(fsType, fsIdent);
     }
 
-    public static RemoteFileSystem getByType(FileSystemType type, 
Configuration conf) {
+    public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, 
Configuration conf,
+                                                       String bindBrokerName) {
         Map<String, String> properties = new HashMap<>();
         conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), 
e.getValue()));
         switch (type) {
@@ -95,6 +98,8 @@ public class FileSystemFactory {
                 return new OFSFileSystem(properties);
             case JFS:
                 return new JFSFileSystem(properties);
+            case BROKER:
+                return new BrokerFileSystem(bindBrokerName, properties);
             default:
                 throw new IllegalStateException("Not supported file system 
type: " + type);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
index 5ddea011744..018130f0c14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
@@ -22,5 +22,6 @@ public enum FileSystemType {
     DFS,
     OFS,
     JFS,
+    BROKER,
     FILE
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
index cb871509288..ef8d484bda9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
@@ -24,8 +24,10 @@ import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.fs.RemoteFiles;
 import org.apache.doris.fs.operations.BrokerFileOperations;
 import org.apache.doris.fs.operations.OpParams;
 import org.apache.doris.service.FrontendOptions;
@@ -34,6 +36,8 @@ import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
 import org.apache.doris.thrift.TBrokerDeletePathRequest;
 import org.apache.doris.thrift.TBrokerFD;
 import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerIsSplittableRequest;
+import org.apache.doris.thrift.TBrokerIsSplittableResponse;
 import org.apache.doris.thrift.TBrokerListPathRequest;
 import org.apache.doris.thrift.TBrokerListResponse;
 import org.apache.doris.thrift.TBrokerOperationStatus;
@@ -65,6 +69,7 @@ import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -556,6 +561,88 @@ public class BrokerFileSystem extends RemoteFileSystem {
         return Status.OK;
     }
 
+    @Override
+    public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, 
boolean recursive) throws UserException {
+        // get a proper broker
+        Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
+        if (pair == null) {
+            throw new UserException("failed to get broker client");
+        }
+        TPaloBrokerService.Client client = pair.first;
+        TNetworkAddress address = pair.second;
+
+        // invoke broker 'listLocatedFiles' interface
+        boolean needReturn = true;
+        try {
+            TBrokerListPathRequest req = new 
TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
+                    recursive, properties);
+            req.setOnlyFiles(onlyFiles);
+            TBrokerListResponse response = client.listLocatedFiles(req);
+            TBrokerOperationStatus operationStatus = response.getOpStatus();
+            if (operationStatus.getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("failed to listLocatedFiles, remote 
path: " + remotePath + ". msg: "
+                    + operationStatus.getMessage() + ", broker: " + 
BrokerUtil.printBroker(name, address));
+            }
+            List<RemoteFile> result = new ArrayList<>();
+            List<TBrokerFileStatus> fileStatus = response.getFiles();
+            for (TBrokerFileStatus tFile : fileStatus) {
+                org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(tFile.path);
+                RemoteFile file = new RemoteFile(path.getName(), path, 
!tFile.isDir, tFile.isDir, tFile.size,
+                        tFile.getBlockSize(), tFile.getModificationTime(), 
null /* blockLocations is null*/);
+                result.add(file);
+            }
+            LOG.info("finished to listLocatedFiles, remote path {}. get files: 
{}", remotePath, result);
+            return new RemoteFiles(result);
+        } catch (TException e) {
+            needReturn = false;
+            throw new UserException("failed to listLocatedFiles, remote path: "
+                + remotePath + ". msg: " + e.getMessage() + ", broker: " + 
BrokerUtil.printBroker(name, address));
+        } finally {
+            if (needReturn) {
+                ClientPool.brokerPool.returnObject(address, client);
+            } else {
+                ClientPool.brokerPool.invalidateObject(address, client);
+            }
+        }
+    }
+
+    public boolean isSplittable(String remotePath, String inputFormat) throws 
UserException {
+        // get a proper broker
+        Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
+        if (pair == null) {
+            throw new UserException("failed to get broker client");
+        }
+        TPaloBrokerService.Client client = pair.first;
+        TNetworkAddress address = pair.second;
+
+        // invoke 'isSplittable' interface
+        boolean needReturn = true;
+        try {
+            TBrokerIsSplittableRequest req = new 
TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE)
+                    
.setPath(remotePath).setInputFormat(inputFormat).setProperties(properties);
+            TBrokerIsSplittableResponse response = client.isSplittable(req);
+            TBrokerOperationStatus operationStatus = response.getOpStatus();
+            if (operationStatus.getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("failed to get path isSplittable, 
remote path: " + remotePath + ". msg: "
+                    + operationStatus.getMessage() + ", broker: " + 
BrokerUtil.printBroker(name, address));
+            }
+            boolean result = response.isSplittable();
+            LOG.info("finished to get path isSplittable, remote path {} with 
format {}, isSplittable: {}",
+                    remotePath, inputFormat, result);
+            return result;
+        } catch (TException e) {
+            needReturn = false;
+            throw new UserException("failed to get path isSplittable, remote 
path: "
+                + remotePath + ". msg: " + e.getMessage() + ", broker: " + 
BrokerUtil.printBroker(name, address));
+        } finally {
+            if (needReturn) {
+                ClientPool.brokerPool.returnObject(address, client);
+            } else {
+                ClientPool.brokerPool.invalidateObject(address, client);
+            }
+        }
+    }
+
     // List files in remotePath
     @Override
     public Status list(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 1ba77fa5f9c..d4141810487 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -196,8 +196,14 @@ public class HiveScanNode extends FileQueryScanNode {
             HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                     .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
             boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
+            String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+            if (bindBrokerName != null && useSelfSplitter == false) {
+                // useSelfSplitter must be true if bindBrokerName is set.
+                throw new 
UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if "
+                        + HMSExternalCatalog.BIND_BROKER_NAME + " is set");
+            }
             List<Split> allFiles = Lists.newArrayList();
-            getFileSplitByPartitions(cache, getPartitions(), allFiles, 
useSelfSplitter);
+            getFileSplitByPartitions(cache, getPartitions(), allFiles, 
useSelfSplitter, bindBrokerName);
             LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
                     allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
             return allFiles;
@@ -210,12 +216,13 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-                                          List<Split> allFiles, boolean 
useSelfSplitter) throws IOException {
+                                          List<Split> allFiles, boolean 
useSelfSplitter,
+                                          String bindBrokerName) throws 
IOException {
         List<FileCacheValue> fileCaches;
         if (hiveTransaction != null) {
-            fileCaches = getFileSplitByTransaction(cache, partitions);
+            fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
         } else {
-            fileCaches = cache.getFilesByPartitionsWithCache(partitions, 
useSelfSplitter);
+            fileCaches = cache.getFilesByPartitionsWithCache(partitions, 
useSelfSplitter, bindBrokerName);
         }
         if (ConnectContext.get().getExecutor() != null) {
             
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
@@ -287,7 +294,8 @@ public class HiveScanNode extends FileQueryScanNode {
         return fileList.subList(0, index);
     }
 
-    private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache 
cache, List<HivePartition> partitions) {
+    private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache 
cache, List<HivePartition> partitions,
+                                                           String 
bindBrokerName) {
         for (HivePartition partition : partitions) {
             if (partition.getPartitionValues() == null || 
partition.getPartitionValues().isEmpty()) {
                 // this is unpartitioned table.
@@ -297,7 +305,8 @@ public class HiveScanNode extends FileQueryScanNode {
         }
         ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
                 ((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
-        return cache.getFilesByTransaction(partitions, validWriteIds, 
hiveTransaction.isFullAcid(), hmsTable.getId());
+        return cache.getFilesByTransaction(partitions, validWriteIds,
+            hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName);
     }
 
     @Override
@@ -319,6 +328,10 @@ public class HiveScanNode extends FileQueryScanNode {
 
     @Override
     protected TFileType getLocationType(String location) throws UserException {
+        String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+        if (bindBrokerName != null) {
+            return TFileType.FILE_BROKER;
+        }
         return getTFileType(location).orElseThrow(() ->
             new DdlException("Unknown file location " + location + " for hms 
table " + hmsTable.getName()));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 927e556c91a..f2c77026312 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -702,7 +702,8 @@ public class StatisticsUtil {
                     table.getRemoteTable().getSd().getLocation(), null));
         }
         // Get files for all partitions.
-        return cache.getFilesByPartitionsWithoutCache(hivePartitions, true);
+        String bindBrokerName = table.getCatalog().bindBrokerName();
+        return cache.getFilesByPartitionsWithoutCache(hivePartitions, true, 
bindBrokerName);
     }
 
     /**
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml 
b/fs_brokers/apache_hdfs_broker/pom.xml
index bbd58e5d5d4..2cb8d892dee 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -69,9 +69,10 @@ under the License.
         <maven.compiler.target>1.8</maven.compiler.target>
         <log4j2.version>2.18.0</log4j2.version>
         <project.scm.id>github</project.scm.id>
-        <hadoop.version>2.10.2</hadoop.version>
+        <hadoop.version>3.3.6</hadoop.version>
         <netty.version>4.1.65.Final</netty.version>
         <gcs.version>hadoop2-2.2.15</gcs.version>
+        
<doris.hive.catalog.shade.version>1.0.1</doris.hive.catalog.shade.version>
     </properties>
     <profiles>
         <!-- for custom internal repository -->
@@ -224,6 +225,29 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
 -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.doris/hive-catalog-shade -->
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
+            <version>${doris.hive.catalog.shade.version}</version>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind 
-->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 55ba457404a..22be82e34c2 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -25,17 +25,19 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import com.google.common.base.Strings;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -1061,6 +1063,49 @@ public class FileSystemManager {
         }
     }
 
+    public List<TBrokerFileStatus> listLocatedFiles(String path, boolean 
onlyFiles,
+                                                    boolean recursive, 
Map<String, String> properties) {
+        List<TBrokerFileStatus> resultFileStatus = null;
+        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        Path locatedPath = new Path(path);
+        try {
+            FileSystem innerFileSystem = fileSystem.getDFSFileSystem();
+            RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ? 
innerFileSystem.listFiles(locatedPath, recursive)
+                : innerFileSystem.listLocatedStatus(locatedPath);
+            return getFileLocations(locatedFiles);
+        } catch (FileNotFoundException e) {
+            logger.info("file not found: " + e.getMessage());
+            throw new 
BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+                e, "file not found");
+        } catch (Exception e) {
+            logger.error("errors while get file status ", e);
+            fileSystem.closeFileSystem();
+            throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+                e, "unknown error when listLocatedFiles");
+        }
+    }
+
+    private List<TBrokerFileStatus> 
getFileLocations(RemoteIterator<LocatedFileStatus> locatedFiles) throws 
IOException {
+        List<TBrokerFileStatus> locations = new ArrayList<>();
+        while (locatedFiles.hasNext()) {
+            LocatedFileStatus fileStatus = locatedFiles.next();
+            TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus();
+            brokerFileStatus.setPath(fileStatus.getPath().toString());
+            brokerFileStatus.setIsDir(fileStatus.isDirectory());
+            if (fileStatus.isDirectory()) {
+                brokerFileStatus.setIsSplitable(false);
+                brokerFileStatus.setSize(-1);
+            } else {
+                brokerFileStatus.setSize(fileStatus.getLen());
+                brokerFileStatus.setIsSplitable(true);
+            }
+            
brokerFileStatus.setModificationTime(fileStatus.getModificationTime());
+            brokerFileStatus.setBlockSize(fileStatus.getBlockSize());
+            locations.add(brokerFileStatus);
+        }
+        return locations;
+    }
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, 
Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);
@@ -1282,13 +1327,7 @@ public class FileSystemManager {
         FSDataOutputStream fsDataOutputStream = 
clientContextManager.getFsDataOutputStream(fd);
         synchronized (fsDataOutputStream) {
             long currentStreamOffset;
-            try {
-                currentStreamOffset = fsDataOutputStream.getPos();
-            } catch (IOException e) {
-                logger.error("errors while get file pos from output stream", 
e);
-                throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
-                        "errors while get file pos from output stream");
-            }
+            currentStreamOffset = fsDataOutputStream.getPos();
             if (currentStreamOffset != offset) {
                 // it's ok, it means that last pwrite succeed finally
                 if (currentStreamOffset == offset + data.length) {
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
index 14ff74dd41e..816462ecb34 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.doris.broker.hdfs;
 
 import com.google.common.base.Stopwatch;
 import org.apache.doris.common.BrokerPerfMonitor;
+import org.apache.doris.common.HiveUtils;
 import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
 import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
 import org.apache.doris.thrift.TBrokerCloseReaderRequest;
@@ -28,6 +29,8 @@ import org.apache.doris.thrift.TBrokerFD;
 import org.apache.doris.thrift.TBrokerFileSizeRequest;
 import org.apache.doris.thrift.TBrokerFileSizeResponse;
 import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerIsSplittableResponse;
+import org.apache.doris.thrift.TBrokerIsSplittableRequest;
 import org.apache.doris.thrift.TBrokerListPathRequest;
 import org.apache.doris.thrift.TBrokerListResponse;
 import org.apache.doris.thrift.TBrokerOpenReaderRequest;
@@ -86,6 +89,47 @@ public class HDFSBrokerServiceImpl implements 
TPaloBrokerService.Iface {
         }
     }
 
+    @Override
+    public TBrokerListResponse listLocatedFiles(TBrokerListPathRequest request)
+            throws TException {
+        logger.info("received a listLocatedFiles request, request detail: " + 
request);
+        TBrokerListResponse response = new TBrokerListResponse();
+        try {
+            boolean recursive = request.isIsRecursive();
+            boolean onlyFiles = false;
+            if (request.isSetOnlyFiles()) {
+                onlyFiles = request.isOnlyFiles();
+            }
+            List<TBrokerFileStatus> fileStatuses = 
fileSystemManager.listLocatedFiles(request.path,
+                onlyFiles, recursive, request.properties);
+            response.setOpStatus(generateOKStatus());
+            response.setFiles(fileStatuses);
+            return response;
+        } catch (BrokerException e) {
+            logger.warn("failed to list path: " + request.path, e);
+            TBrokerOperationStatus errorStatus = 
e.generateFailedOperationStatus();
+            response.setOpStatus(errorStatus);
+            return response;
+        }
+    }
+
+    @Override
+    public TBrokerIsSplittableResponse isSplittable(TBrokerIsSplittableRequest 
request) throws TException {
+        logger.info("received a isSplittable request, request detail: " + 
request);
+        TBrokerIsSplittableResponse response = new 
TBrokerIsSplittableResponse();
+        try {
+            boolean isSplittable = HiveUtils.isSplittable(request.path, 
request.inputFormat, request.properties);
+            response.setOpStatus(generateOKStatus());
+            response.setSplittable(isSplittable);
+            return response;
+        } catch (BrokerException e) {
+            logger.warn("failed to get isSplitable with path: " + 
request.path, e);
+            TBrokerOperationStatus errorStatus = 
e.generateFailedOperationStatus();
+            response.setOpStatus(errorStatus);
+            return response;
+        }
+    }
+
     @Override
     public TBrokerOperationStatus deletePath(TBrokerDeletePathRequest request)
             throws TException {
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
new file mode 100644
index 00000000000..f2211eb2026
--- /dev/null
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.broker.hdfs.BrokerException;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+public class HiveUtils {
+    private static final Logger logger = 
Logger.getLogger(HiveUtils.class.getName());
+
+    public static boolean isSplittable(String path, String inputFormatName,
+                                       Map<String, String> properties) throws 
BrokerException {
+        JobConf jobConf = getJobConf(properties);
+        InputFormat inputFormat = getInputFormat(jobConf, inputFormatName);
+        return isSplittableInternal(inputFormat, new Path(path), jobConf);
+    }
+
+    private static JobConf getJobConf(Map<String, String> properties) {
+        Configuration configuration = new Configuration();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            configuration.set(entry.getKey(), entry.getValue());
+        }
+        return new JobConf(configuration);
+    }
+
+    private static InputFormat<?, ?> getInputFormat(JobConf jobConf, String 
inputFormatName) throws BrokerException {
+        try {
+            Class<? extends InputFormat<?, ?>> inputFormatClass = 
getInputFormatClass(jobConf, inputFormatName);
+            if (inputFormatClass == SymlinkTextInputFormat.class) {
+                // symlink targets are always TextInputFormat
+                inputFormatClass = TextInputFormat.class;
+            }
+
+            return ReflectionUtils.newInstance(inputFormatClass, jobConf);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                "Unable to create input format " + inputFormatName, e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "RedundantCast"})
+    private static Class<? extends InputFormat<?, ?>> 
getInputFormatClass(JobConf conf, String inputFormatName)
+        throws ClassNotFoundException {
+        // CDH uses different names for Parquet
+        if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
+            || 
"parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
+            return MapredParquetInputFormat.class;
+        }
+
+        Class<?> clazz = conf.getClassByName(inputFormatName);
+        return (Class<? extends InputFormat<?, ?>>) 
clazz.asSubclass(InputFormat.class);
+    }
+
+    private static boolean isSplittableInternal(InputFormat<?, ?> inputFormat, 
Path path, JobConf jobConf) {
+        // ORC uses a custom InputFormat but is always splittable
+        if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
+            return true;
+        }
+
+        // use reflection to get isSplittable method on FileInputFormat
+        Method method = null;
+        for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = 
clazz.getSuperclass()) {
+            try {
+                method = clazz.getDeclaredMethod("isSplitable", 
FileSystem.class, Path.class);
+                break;
+            } catch (NoSuchMethodException ignored) {
+                logger.warn(LoggerMessageFormat.format("Class {} doesn't 
contain isSplitable method", clazz));
+            }
+        }
+
+        if (method == null) {
+            return false;
+        }
+        try {
+            method.setAccessible(true);
+            return (boolean) method.invoke(inputFormat, 
path.getFileSystem(jobConf), path);
+        } catch (InvocationTargetException | IllegalAccessException | 
IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/gensrc/thrift/PaloBrokerService.thrift 
b/gensrc/thrift/PaloBrokerService.thrift
index 308c606544e..e4bc60a201f 100644
--- a/gensrc/thrift/PaloBrokerService.thrift
+++ b/gensrc/thrift/PaloBrokerService.thrift
@@ -91,12 +91,25 @@ struct TBrokerCheckPathExistResponse {
     2: required bool isPathExist;
 }
 
+struct TBrokerIsSplittableResponse {
+    1: optional TBrokerOperationStatus opStatus;
+    2: optional bool splittable;
+}
+
 struct TBrokerListPathRequest {
     1: required TBrokerVersion version;
     2: required string path;
     3: required bool isRecursive;
     4: required map<string,string> properties;
     5: optional bool fileNameOnly;
+    6: optional bool onlyFiles;
+}
+
+struct TBrokerIsSplittableRequest {
+    1: optional TBrokerVersion version;
+    2: optional string path;
+    3: optional string inputFormat;
+    4: optional map<string,string> properties;
 }
 
 struct TBrokerDeletePathRequest {
@@ -184,6 +197,13 @@ service TPaloBrokerService {
     
     // return a list of files under a path
     TBrokerListResponse listPath(1: TBrokerListPathRequest request);
+
+    // return located files of a given path. A broker implementation refers to
+    // 'org.apache.doris.fs.remote.RemoteFileSystem#listLocatedFiles' in 
fe-core.
+    TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request);
+
+    // return whether the path with specified input format is splittable.
+    TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest 
request);
     
     // delete a file, if the deletion of the file fails, the status code will 
return an error message
     // input:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to