This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 05cedfca4e5 [fix](hudi) catch exception when getting hudi partition
(#35027) (#35159)
05cedfca4e5 is described below
commit 05cedfca4e5238bcfe610c1cc573c34e20ed57fd
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed May 22 18:44:19 2024 +0800
[fix](hudi) catch exception when getting hudi partition (#35027) (#35159)
bp #35027
---
.../doris/datasource/hive/HiveMetaStoreCache.java | 2 +-
.../org/apache/doris/datasource/hive/HiveUtil.java | 2 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 78 ++++++++++++----------
3 files changed, 46 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index b97284eda9c..a22e951be40 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -355,7 +355,7 @@ public class HiveMetaStoreCache {
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), properties,
bindBrokerName));
- result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
+ result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the
same time. eg:
// /user/hive/warehouse/region_tmp_union_all2/000000_0
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index bca04215fc4..5ca42dd0245 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -110,7 +110,7 @@ public final class HiveUtil {
}
public static boolean isSplittable(RemoteFileSystem remoteFileSystem,
String inputFormat,
- String location, JobConf jobConf) throws UserException {
+ String location) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem)
remoteFileSystem).isSplittable(location, inputFormat);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8dd853a48f3..61edc333f6c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -77,6 +77,7 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class HudiScanNode extends HiveScanNode {
@@ -329,49 +330,58 @@ public class HudiScanNode extends HiveScanNode {
private void getPartitionSplits(List<HivePartition> partitions,
List<Split> splits) {
Executor executor =
Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
+ AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
- String globPath;
- String partitionName = "";
- if (partition.isDummyPartition()) {
- globPath = hudiClient.getBasePathV2().toString() + "/*";
- } else {
- partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
- new Path(partition.getPath()));
- globPath = String.format("%s/%s/*",
hudiClient.getBasePathV2().toString(), partitionName);
- }
- List<FileStatus> statuses;
try {
- statuses =
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
- new Path(globPath));
- } catch (IOException e) {
- throw new RuntimeException("Failed to get hudi file statuses
on path: " + globPath, e);
- }
- HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
- timeline, statuses.toArray(new FileStatus[0]));
-
- if (isCowOrRoTable) {
- fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
- noLogsSplitNum.incrementAndGet();
- String filePath = baseFile.getPath();
- long fileSize = baseFile.getFileSize();
- // Need add hdfs host to location
- LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
- Path splitFilePath = locationPath.toStorageLocation();
- splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
- new String[0], partition.getPartitionValues()));
- });
- } else {
-
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
- .forEach(fileSlice -> splits.add(
- generateHudiSplit(fileSlice,
partition.getPartitionValues(), queryInstant)));
+ String globPath;
+ String partitionName = "";
+ if (partition.isDummyPartition()) {
+ globPath = hudiClient.getBasePathV2().toString() + "/*";
+ } else {
+ partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+ new Path(partition.getPath()));
+ globPath = String.format("%s/%s/*",
hudiClient.getBasePathV2().toString(), partitionName);
+ }
+ List<FileStatus> statuses;
+ try {
+ statuses =
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
+ new Path(globPath));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get hudi file
statuses on path: " + globPath, e);
+ }
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
+ timeline, statuses.toArray(new FileStatus[0]));
+
+ if (isCowOrRoTable) {
+ fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
+ noLogsSplitNum.incrementAndGet();
+ String filePath = baseFile.getPath();
+ long fileSize = baseFile.getFileSize();
+ // Need add hdfs host to location
+ LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
+ Path splitFilePath = locationPath.toStorageLocation();
+ splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
+ new String[0],
partition.getPartitionValues()));
+ });
+ } else {
+
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+ .forEach(fileSlice -> splits.add(
+ generateHudiSplit(fileSlice,
partition.getPartitionValues(), queryInstant)));
+ }
+ } catch (Throwable t) {
+ throwable.set(t);
+ } finally {
+ countDownLatch.countDown();
}
- countDownLatch.countDown();
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
+ if (throwable.get() != null) {
+ throw new RuntimeException(throwable.get().getMessage(),
throwable.get());
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]