This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 05cedfca4e5 [fix](hudi) catch exception when getting hudi partition 
(#35027) (#35159)
05cedfca4e5 is described below

commit 05cedfca4e5238bcfe610c1cc573c34e20ed57fd
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed May 22 18:44:19 2024 +0800

    [fix](hudi) catch exception when getting hudi partition (#35027) (#35159)
    
    bp #35027
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  2 +-
 .../org/apache/doris/datasource/hive/HiveUtil.java |  2 +-
 .../doris/datasource/hudi/source/HudiScanNode.java | 78 ++++++++++++----------
 3 files changed, 46 insertions(+), 36 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index b97284eda9c..a22e951be40 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -355,7 +355,7 @@ public class HiveMetaStoreCache {
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
                         location, bindBrokerName), properties, 
bindBrokerName));
-        result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, 
jobConf));
+        result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
         // For Tez engine, it may generate subdirectoies for "union" query.
         // So there may be files and directories in the table directory at the 
same time. eg:
         //      /user/hive/warehouse/region_tmp_union_all2/000000_0
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index bca04215fc4..5ca42dd0245 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -110,7 +110,7 @@ public final class HiveUtil {
     }
 
     public static boolean isSplittable(RemoteFileSystem remoteFileSystem, 
String inputFormat,
-            String location, JobConf jobConf) throws UserException {
+            String location) throws UserException {
         if (remoteFileSystem instanceof BrokerFileSystem) {
             return ((BrokerFileSystem) 
remoteFileSystem).isSplittable(location, inputFormat);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8dd853a48f3..61edc333f6c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -77,6 +77,7 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class HudiScanNode extends HiveScanNode {
@@ -329,49 +330,58 @@ public class HudiScanNode extends HiveScanNode {
     private void getPartitionSplits(List<HivePartition> partitions, 
List<Split> splits) {
         Executor executor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
         CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
         partitions.forEach(partition -> executor.execute(() -> {
-            String globPath;
-            String partitionName = "";
-            if (partition.isDummyPartition()) {
-                globPath = hudiClient.getBasePathV2().toString() + "/*";
-            } else {
-                partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
-                        new Path(partition.getPath()));
-                globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
-            }
-            List<FileStatus> statuses;
             try {
-                statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
-                        new Path(globPath));
-            } catch (IOException e) {
-                throw new RuntimeException("Failed to get hudi file statuses 
on path: " + globPath, e);
-            }
-            HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
-                    timeline, statuses.toArray(new FileStatus[0]));
-
-            if (isCowOrRoTable) {
-                fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
-                    noLogsSplitNum.incrementAndGet();
-                    String filePath = baseFile.getPath();
-                    long fileSize = baseFile.getFileSize();
-                    // Need add hdfs host to location
-                    LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
-                    Path splitFilePath = locationPath.toStorageLocation();
-                    splits.add(new FileSplit(splitFilePath, 0, fileSize, 
fileSize,
-                            new String[0], partition.getPartitionValues()));
-                });
-            } else {
-                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
-                        .forEach(fileSlice -> splits.add(
-                                generateHudiSplit(fileSlice, 
partition.getPartitionValues(), queryInstant)));
+                String globPath;
+                String partitionName = "";
+                if (partition.isDummyPartition()) {
+                    globPath = hudiClient.getBasePathV2().toString() + "/*";
+                } else {
+                    partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+                            new Path(partition.getPath()));
+                    globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
+                }
+                List<FileStatus> statuses;
+                try {
+                    statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
+                            new Path(globPath));
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to get hudi file 
statuses on path: " + globPath, e);
+                }
+                HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
+                        timeline, statuses.toArray(new FileStatus[0]));
+
+                if (isCowOrRoTable) {
+                    fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
+                        noLogsSplitNum.incrementAndGet();
+                        String filePath = baseFile.getPath();
+                        long fileSize = baseFile.getFileSize();
+                        // Need add hdfs host to location
+                        LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
+                        Path splitFilePath = locationPath.toStorageLocation();
+                        splits.add(new FileSplit(splitFilePath, 0, fileSize, 
fileSize,
+                                new String[0], 
partition.getPartitionValues()));
+                    });
+                } else {
+                    
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+                            .forEach(fileSlice -> splits.add(
+                                    generateHudiSplit(fileSlice, 
partition.getPartitionValues(), queryInstant)));
+                }
+            } catch (Throwable t) {
+                throwable.set(t);
+            } finally {
+                countDownLatch.countDown();
             }
-            countDownLatch.countDown();
         }));
         try {
             countDownLatch.await();
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+        if (throwable.get() != null) {
+            throw new RuntimeException(throwable.get().getMessage(), 
throwable.get());
+        }
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to