This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb165cc9f [opt](hudi) get hudi split concurrently by using 
parallelStream (#21871)
dcb165cc9f is described below

commit dcb165cc9fd5a05b317ee6943f27fc98a02a168e
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Jul 18 23:19:34 2023 +0800

    [opt](hudi) get hudi split concurrently by using parallelStream (#21871)
    
    This PR contains two optimizations:
    1. Using parallel stream to get hoodie splits concurrently. It reduce the 
split time from 1min20s to 12s when splitting 10,000 partitions.
    2. Reading hoodie meta table to get table partitions. It reduce the getting 
partition time from 12min to 3s when reading 10,000 partitions.
---
 .../planner/external/TablePartitionValues.java     |   2 +-
 .../hudi/HudiCachedPartitionProcessor.java         |  50 ++++++--
 .../external/hudi/HudiPartitionProcessor.java      |  35 +++---
 .../doris/planner/external/hudi/HudiScanNode.java  | 140 ++++++++++++---------
 4 files changed, 138 insertions(+), 89 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
index bcc967501c..87f11e5863 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
@@ -171,7 +171,7 @@ public class TablePartitionValues {
         return readWriteLock.writeLock();
     }
 
-    private void cleanPartitions() {
+    public void cleanPartitions() {
         nextPartitionId = 0;
         idToPartitionItem.clear();
         partitionNameToIdMap.clear();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
index ab6a8839b5..37225c2339 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.CacheException;
 import org.apache.doris.planner.external.TablePartitionValues;
 import 
org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -39,10 +40,12 @@ import java.util.stream.Collectors;
 
 public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
     private final long catalogId;
+    private final Executor executor;
     private final LoadingCache<TablePartitionKey, TablePartitionValues> 
partitionCache;
 
     public HudiCachedPartitionProcessor(long catalogId, Executor executor) {
         this.catalogId = catalogId;
+        this.executor = executor;
         this.partitionCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num)
                 
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
                 .build(CacheLoader.asyncReloading(
@@ -54,6 +57,10 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
                         }, executor));
     }
 
+    public Executor getExecutor() {
+        return executor;
+    }
+
     @Override
     public void cleanUp() {
         partitionCache.cleanUp();
@@ -63,7 +70,6 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
     public void cleanDatabasePartitions(String dbName) {
         partitionCache.asMap().keySet().stream().filter(k -> 
k.getDbName().equals(dbName)).collect(Collectors.toList())
                 .forEach(partitionCache::invalidate);
-
     }
 
     @Override
@@ -74,9 +80,34 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
                 .forEach(partitionCache::invalidate);
     }
 
+    public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable 
table,
+            HoodieTableMetaClient tableMetaClient, String timestamp) {
+        Preconditions.checkState(catalogId == table.getCatalog().getId());
+        Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
+        if (!partitionColumns.isPresent()) {
+            return null;
+        }
+        HoodieTimeline timeline = 
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+        Option<HoodieInstant> lastInstant = timeline.lastInstant();
+        if (!lastInstant.isPresent()) {
+            return null;
+        }
+        long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
+        if (Long.parseLong(timestamp) == lastTimestamp) {
+            return getPartitionValues(table, tableMetaClient);
+        }
+        List<String> partitionNames = 
getPartitionNamesBeforeOrEquals(timeline, timestamp);
+        List<String> partitionColumnsList = 
Arrays.asList(partitionColumns.get());
+        TablePartitionValues partitionValues = new TablePartitionValues();
+        partitionValues.addPartitions(partitionNames,
+                partitionNames.stream().map(p -> 
parsePartitionValues(partitionColumnsList, p))
+                        .collect(Collectors.toList()), 
table.getPartitionColumnTypes());
+        return partitionValues;
+    }
+
     public TablePartitionValues getPartitionValues(HMSExternalTable table, 
HoodieTableMetaClient tableMetaClient)
             throws CacheException {
-        assert (catalogId == table.getCatalog().getId());
+        Preconditions.checkState(catalogId == table.getCatalog().getId());
         Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
         if (!partitionColumns.isPresent()) {
             return null;
@@ -93,10 +124,9 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
             partitionValues.readLock().lock();
             try {
                 long lastUpdateTimestamp = 
partitionValues.getLastUpdateTimestamp();
-                if (lastTimestamp == lastUpdateTimestamp) {
+                if (lastTimestamp <= lastUpdateTimestamp) {
                     return partitionValues;
                 }
-                assert (lastTimestamp > lastUpdateTimestamp);
             } finally {
                 partitionValues.readLock().unlock();
             }
@@ -104,18 +134,12 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
             partitionValues.writeLock().lock();
             try {
                 long lastUpdateTimestamp = 
partitionValues.getLastUpdateTimestamp();
-                if (lastTimestamp == lastUpdateTimestamp) {
+                if (lastTimestamp <= lastUpdateTimestamp) {
                     return partitionValues;
                 }
-                assert (lastTimestamp > lastUpdateTimestamp);
-                List<String> partitionNames;
-                if (lastUpdateTimestamp == 0) {
-                    partitionNames = getAllPartitionNames(tableMetaClient);
-                } else {
-                    partitionNames = getPartitionNamesInRange(timeline, 
String.valueOf(lastUpdateTimestamp),
-                            String.valueOf(lastTimestamp));
-                }
+                List<String> partitionNames = 
getAllPartitionNames(tableMetaClient);
                 List<String> partitionColumnsList = 
Arrays.asList(partitionColumns.get());
+                partitionValues.cleanPartitions();
                 partitionValues.addPartitions(partitionNames,
                         partitionNames.stream().map(p -> 
parsePartitionValues(partitionColumnsList, p))
                                 .collect(Collectors.toList()), 
table.getPartitionColumnTypes());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
index 3be3e1f080..807bb37d46 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external.hudi;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -47,12 +46,8 @@ public abstract class HudiPartitionProcessor {
     }
 
     public List<String> getAllPartitionNames(HoodieTableMetaClient 
tableMetaClient) throws IOException {
-        TypedProperties configProperties = new TypedProperties();
         HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
-                .fromProperties(configProperties)
-                
.enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(),
-                        
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
-                        && 
HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
+                
.enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
                 .build();
 
         HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
@@ -60,19 +55,29 @@ public abstract class HudiPartitionProcessor {
                 tableMetaClient.getBasePathV2().toString(),
                 FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), 
true);
 
-        return 
newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList(""));
+        return newTableMetadata.getAllPartitionPaths();
+    }
+
+    public List<String> getPartitionNamesBeforeOrEquals(HoodieTimeline 
timeline, String timestamp) {
+        return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+                
timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant
 -> {
+                    try {
+                        return TimelineUtils.getCommitMetadata(instant, 
timeline);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e.getMessage(), e);
+                    }
+                }).collect(Collectors.toList())));
     }
 
     public List<String> getPartitionNamesInRange(HoodieTimeline timeline, 
String startTimestamp, String endTimestamp) {
         return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
-                timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants().stream()
-                        .map(instant -> {
-                            try {
-                                return 
TimelineUtils.getCommitMetadata(instant, timeline);
-                            } catch (IOException e) {
-                                throw new RuntimeException(e.getMessage(), e);
-                            }
-                        }).collect(Collectors.toList())));
+                timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants().stream().map(instant -> {
+                    try {
+                        return TimelineUtils.getCommitMetadata(instant, 
timeline);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e.getMessage(), e);
+                    }
+                }).collect(Collectors.toList())));
     }
 
     public static List<String> parsePartitionValues(List<String> 
partitionColumns, String partitionPath) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 734b3943a9..20a3a1cf6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -61,6 +61,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -69,6 +70,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class HudiScanNode extends HiveScanNode {
@@ -77,7 +81,7 @@ public class HudiScanNode extends HiveScanNode {
 
     private final boolean isCowTable;
 
-    private long noLogsSplitNum = 0;
+    private final AtomicLong noLogsSplitNum = new AtomicLong(0);
 
     /**
      * External file scan node for Query Hudi table
@@ -147,12 +151,18 @@ public class HudiScanNode extends HiveScanNode {
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
-    private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient 
metaClient) throws AnalysisException {
+    private List<HivePartition> getPrunedPartitions(
+            HoodieTableMetaClient metaClient, Option<String> 
snapshotTimestamp) throws AnalysisException {
         List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
         if (!partitionColumnTypes.isEmpty()) {
             HudiCachedPartitionProcessor processor = 
(HudiCachedPartitionProcessor) Env.getCurrentEnv()
                     
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
-            TablePartitionValues partitionValues = 
processor.getPartitionValues(hmsTable, metaClient);
+            TablePartitionValues partitionValues;
+            if (snapshotTimestamp.isPresent()) {
+                partitionValues = 
processor.getSnapshotPartitionValues(hmsTable, metaClient, 
snapshotTimestamp.get());
+            } else {
+                partitionValues = processor.getPartitionValues(hmsTable, 
metaClient);
+            }
             if (partitionValues != null) {
                 // 2. prune partitions by expr
                 partitionValues.readLock().lock();
@@ -215,7 +225,6 @@ public class HudiScanNode extends HiveScanNode {
         List<FieldSchema> allFields = 
hmsTable.getRemoteTable().getSd().getCols();
         allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys());
 
-        List<Split> splits = new ArrayList<>();
         for (Schema.Field hudiField : hudiSchema.getFields()) {
             String columnName = hudiField.name().toLowerCase(Locale.ROOT);
             // keep hive metastore column in hudi avro schema.
@@ -231,14 +240,17 @@ public class HudiScanNode extends HiveScanNode {
 
         HoodieTimeline timeline = 
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
         String queryInstant;
+        Option<String> snapshotTimestamp;
         if (desc.getRef().getTableSnapshot() != null) {
             queryInstant = desc.getRef().getTableSnapshot().getTime();
+            snapshotTimestamp = Option.of(queryInstant);
         } else {
             Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
             if (!snapshotInstant.isPresent()) {
                 return Collections.emptyList();
             }
             queryInstant = snapshotInstant.get().getTimestamp();
+            snapshotTimestamp = Option.empty();
         }
         // Non partition table will get one dummy partition
         UserGroupInformation ugi = 
HiveMetaStoreClientHelper.getUserGroupInformation(
@@ -247,71 +259,79 @@ public class HudiScanNode extends HiveScanNode {
         if (ugi != null) {
             try {
                 partitions = ugi.doAs(
-                        (PrivilegedExceptionAction<List<HivePartition>>) () -> 
getPrunedPartitions(hudiClient));
+                        (PrivilegedExceptionAction<List<HivePartition>>) () -> 
getPrunedPartitions(hudiClient,
+                                snapshotTimestamp));
             } catch (Exception e) {
                 throw new UserException(e);
             }
         } else {
-            partitions = getPrunedPartitions(hudiClient);
+            partitions = getPrunedPartitions(hudiClient, snapshotTimestamp);
         }
-        try {
-            for (HivePartition partition : partitions) {
-                String globPath;
-                String partitionName = "";
-                if (partition.isDummyPartition()) {
-                    globPath = hudiClient.getBasePathV2().toString() + "/*";
-                } else {
-                    partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
-                            new Path(partition.getPath()));
-                    globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
-                }
-                List<FileStatus> statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
+        Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv()
+                
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor();
+        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
+        CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
+        partitions.forEach(partition -> executor.execute(() -> {
+            String globPath;
+            String partitionName = "";
+            if (partition.isDummyPartition()) {
+                globPath = hudiClient.getBasePathV2().toString() + "/*";
+            } else {
+                partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+                        new Path(partition.getPath()));
+                globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
+            }
+            List<FileStatus> statuses;
+            try {
+                statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
                         new Path(globPath));
-                HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
-                        timeline, statuses.toArray(new FileStatus[0]));
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to get hudi file statuses 
on path: " + globPath, e);
+            }
+            HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
+                    timeline, statuses.toArray(new FileStatus[0]));
 
-                if (isCowTable) {
-                    fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
-                        noLogsSplitNum++;
-                        String filePath = baseFile.getPath();
-                        long fileSize = baseFile.getFileSize();
-                        FileSplit split = new FileSplit(new Path(filePath), 0, 
fileSize, fileSize, new String[0],
-                                partition.getPartitionValues());
-                        splits.add(split);
-                    });
-                } else {
-                    
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
-                            .forEach(fileSlice -> {
-                                Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
-                                String filePath = 
baseFile.map(BaseFile::getPath).orElse("");
-                                long fileSize = 
baseFile.map(BaseFile::getFileSize).orElse(0L);
+            if (isCowTable) {
+                fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
+                    noLogsSplitNum.incrementAndGet();
+                    String filePath = baseFile.getPath();
+                    long fileSize = baseFile.getFileSize();
+                    splits.add(new FileSplit(new Path(filePath), 0, fileSize, 
fileSize, new String[0],
+                            partition.getPartitionValues()));
+                });
+            } else {
+                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, 
queryInstant).forEach(fileSlice -> {
+                    Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
+                    String filePath = 
baseFile.map(BaseFile::getPath).orElse("");
+                    long fileSize = 
baseFile.map(BaseFile::getFileSize).orElse(0L);
 
-                                List<String> logs = 
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
-                                        .map(Path::toString)
-                                        .collect(Collectors.toList());
-                                if (logs.isEmpty()) {
-                                    noLogsSplitNum++;
-                                }
+                    List<String> logs = 
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
+                            .map(Path::toString)
+                            .collect(Collectors.toList());
+                    if (logs.isEmpty()) {
+                        noLogsSplitNum.incrementAndGet();
+                    }
 
-                                HudiSplit split = new HudiSplit(new 
Path(filePath), 0, fileSize, fileSize,
-                                        new String[0], 
partition.getPartitionValues());
-                                split.setTableFormatType(TableFormatType.HUDI);
-                                split.setDataFilePath(filePath);
-                                split.setHudiDeltaLogs(logs);
-                                split.setInputFormat(inputFormat);
-                                split.setSerde(serdeLib);
-                                split.setBasePath(basePath);
-                                split.setHudiColumnNames(columnNames);
-                                split.setHudiColumnTypes(columnTypes);
-                                split.setInstantTime(queryInstant);
-                                splits.add(split);
-                            });
-                }
+                    HudiSplit split = new HudiSplit(new Path(filePath), 0, 
fileSize, fileSize,
+                            new String[0], partition.getPartitionValues());
+                    split.setTableFormatType(TableFormatType.HUDI);
+                    split.setDataFilePath(filePath);
+                    split.setHudiDeltaLogs(logs);
+                    split.setInputFormat(inputFormat);
+                    split.setSerde(serdeLib);
+                    split.setBasePath(basePath);
+                    split.setHudiColumnNames(columnNames);
+                    split.setHudiColumnTypes(columnTypes);
+                    split.setInstantTime(queryInstant);
+                    splits.add(split);
+                });
             }
-        } catch (Exception e) {
-            String errorMsg = String.format("Failed to get hudi info on 
basePath: %s", basePath);
-            LOG.error(errorMsg, e);
-            throw new IllegalArgumentException(errorMsg, e);
+            countDownLatch.countDown();
+        }));
+        try {
+            countDownLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e.getMessage(), e);
         }
         return splits;
     }
@@ -319,6 +339,6 @@ public class HudiScanNode extends HiveScanNode {
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         return super.getNodeExplainString(prefix, detailLevel)
-                + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, 
noLogsSplitNum, inputSplitsNum);
+                + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, 
noLogsSplitNum.get(), inputSplitsNum);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to