This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 04008333d40 [opt](split) generate and get split batch concurrently
(#36045)
04008333d40 is described below
commit 04008333d4081d3502330c8c78bd32634699aa6f
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Jun 19 16:15:47 2024 +0800
[opt](split) generate and get split batch concurrently (#36045)
## Proposed changes
Generate and get split batch concurrently.
`SplitSource.getNextBatch` remove the synchronization, and make each get
their splits concurrently, and `SplitAssignment` generates splits
asynchronously.
---
be/src/common/config.cpp | 2 +-
be/src/vec/exec/scan/split_source_connector.cpp | 1 +
be/src/vec/exec/scan/split_source_connector.h | 6 +
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +
be/src/vec/exec/scan/vfile_scanner.h | 1 +
.../org/apache/doris/common/util/LocationPath.java | 26 ++--
.../apache/doris/datasource/ExternalCatalog.java | 6 +-
.../doris/datasource/ExternalMetaCacheMgr.java | 10 ++
.../apache/doris/datasource/FileQueryScanNode.java | 25 +++-
.../org/apache/doris/datasource/FileScanNode.java | 6 +-
.../apache/doris/datasource/SplitAssignment.java | 158 ++++++++++++++++-----
.../apache/doris/datasource/SplitGenerator.java | 26 ++--
.../org/apache/doris/datasource/SplitSource.java | 64 +++++----
.../doris/datasource/hive/HiveMetaStoreCache.java | 45 +++---
.../doris/datasource/hive/source/HiveScanNode.java | 96 ++++++++-----
.../doris/datasource/hudi/source/HudiScanNode.java | 144 +++++++++++--------
.../java/org/apache/doris/planner/ScanNode.java | 4 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 6 +
18 files changed, 402 insertions(+), 226 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b7fdac5f660..5eb0e8d26ba 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -248,7 +248,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num,
[](const int config) -> b
}
return true;
});
-DEFINE_Int32(remote_split_source_batch_size, "1024");
+DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
diff --git a/be/src/vec/exec/scan/split_source_connector.cpp
b/be/src/vec/exec/scan/split_source_connector.cpp
index 2d35d3796bc..fae65543e53 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -45,6 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next,
TFileRangeDesc* rang
std::lock_guard<std::mutex> l(_range_lock);
*has_next = false;
if (_scan_index == _scan_ranges.size() && !_last_batch) {
+ SCOPED_RAW_TIMER(&_get_split_timer);
Status coord_status;
FrontendServiceConnection
coord(_state->exec_env()->frontend_client_cache(),
_state->get_query_ctx()->coord_addr,
&coord_status);
diff --git a/be/src/vec/exec/scan/split_source_connector.h
b/be/src/vec/exec/scan/split_source_connector.h
index cf358846b30..bfda961df34 100644
--- a/be/src/vec/exec/scan/split_source_connector.h
+++ b/be/src/vec/exec/scan/split_source_connector.h
@@ -43,6 +43,8 @@ public:
virtual int num_scan_ranges() = 0;
virtual TFileScanRangeParams* get_params() = 0;
+
+ virtual int64_t get_split_time() { return 0; }
};
/**
@@ -95,6 +97,8 @@ private:
int _scan_index = 0;
int _range_index = 0;
+ int64_t _get_split_timer = 0;
+
public:
RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int
num_splits)
: _state(state), _split_source_id(split_source_id),
_num_splits(num_splits) {}
@@ -110,6 +114,8 @@ public:
TFileScanRangeParams* get_params() override {
LOG(FATAL) << "Unreachable, params is got by
file_scan_range_params_map";
}
+
+ int64_t get_split_time() override { return _get_split_timer; }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index f6f029b9de0..4932e164649 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -138,6 +138,7 @@ Status VFileScanner::prepare(
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber",
TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(),
"HasFullyRfFileNumber", TUnit::UNIT);
+ _get_split_timer = ADD_TIMER(_local_state->scanner_profile(),
"GetSplitTime");
_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
@@ -1162,6 +1163,7 @@ Status VFileScanner::close(RuntimeState* state) {
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
+ COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time());
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 332bdfe11e1..61d75d65683 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -180,6 +180,7 @@ private:
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
+ RuntimeProfile::Counter* _get_split_timer = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 38b5250a157..dd1641126bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -41,7 +41,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
@@ -74,10 +74,14 @@ public class LocationPath {
}
private LocationPath(String location) {
- this(location, new HashMap<>());
+ this(location, Collections.emptyMap(), true);
}
public LocationPath(String location, Map<String, String> props) {
+ this(location, props, true);
+ }
+
+ public LocationPath(String location, Map<String, String> props, boolean
convertPath) {
String scheme = parseScheme(location).toLowerCase();
if (scheme.isEmpty()) {
locationType = LocationType.NOSCHEME;
@@ -88,7 +92,7 @@ public class LocationPath {
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
- this.location = normalizedHdfsPath(location, host);
+ this.location = convertPath ? normalizedHdfsPath(location,
host) : location;
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
@@ -96,22 +100,22 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table,
such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
@@ -119,7 +123,7 @@ public class LocationPath {
this.location = location;
} else {
if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
+ this.location = convertPath ?
convertToS3(location) : location;
} else {
this.location = location;
}
@@ -128,7 +132,7 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
} else {
this.location = location;
}
@@ -136,7 +140,7 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
+ this.location = convertPath ? convertToS3(location) :
location;
} else {
this.location = location;
}
@@ -331,7 +335,7 @@ public class LocationPath {
if (location == null || location.isEmpty()) {
return null;
}
- LocationPath locationPath = new LocationPath(location);
+ LocationPath locationPath = new LocationPath(location,
Collections.emptyMap(), false);
return locationPath.getTFileTypeForBE();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a72bd709541..5bdbe594059 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -792,11 +792,7 @@ public abstract class ExternalCatalog
}
public String bindBrokerName() {
- Map<String, String> properties = catalogProperty.getProperties();
- if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
- return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
- }
- return null;
+ return
catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME);
}
// ATTN: this method only return all cached databases.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 06da3c9d5e1..513fc951672 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -77,6 +77,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
+ private ExecutorService scheduleExecutor;
// catalog id -> HiveMetaStoreCache
private final Map<Long, HiveMetaStoreCache> cacheMap =
Maps.newConcurrentMap();
@@ -109,6 +110,11 @@ public class ExternalMetaCacheMgr {
Config.max_external_cache_loader_thread_pool_size * 1000,
"FileListingExecutor", 10, true);
+ scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+ Config.max_external_cache_loader_thread_pool_size,
+ Config.max_external_cache_loader_thread_pool_size * 1000,
+ "scheduleExecutor", 10, true);
+
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
@@ -121,6 +127,10 @@ public class ExternalMetaCacheMgr {
return fileListingExecutor;
}
+ public ExecutorService getScheduleExecutor() {
+ return scheduleExecutor;
+ }
+
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
if (cache == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index ce5e7991515..9039f1a8c58 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -317,18 +317,19 @@ public abstract class FileQueryScanNode extends
FileScanNode {
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while
scanning.
// Only provide the unique ID of split source to backend.
- SplitAssignment splitAssignment = new
SplitAssignment(backendPolicy, this);
+ splitAssignment = new SplitAssignment(
+ backendPolicy, this, this::splitToScanRange,
locationProperties, pathPartitionKeys);
splitAssignment.init();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
- if (splitAssignment.getCurrentAssignment().isEmpty() &&
!(getLocationType() == TFileType.FILE_STREAM)) {
+ if (splitAssignment.getSampleSplit() == null &&
!(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
- inputSplitsNum = splitAssignment.numApproximateSplits();
+ inputSplitsNum = numApproximateSplits();
TFileType locationType;
- FileSplit fileSplit = (FileSplit)
splitAssignment.getCurrentAssignment().values().iterator().next();
+ FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
@@ -337,10 +338,9 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
// Not accurate, only used to estimate concurrency.
- int numSplitsPerBE = splitAssignment.numApproximateSplits() /
backendPolicy.numBackends();
+ int numSplitsPerBE = numApproximateSplits() /
backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
- SplitSource splitSource = new SplitSource(
- this::splitToScanRange, backend, locationProperties,
splitAssignment, pathPartitionKeys);
+ SplitSource splitSource = new SplitSource(backend,
splitAssignment);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
@@ -582,4 +582,15 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract TableIf getTargetTable() throws UserException;
protected abstract Map<String, String> getLocationProperties() throws
UserException;
+
+ @Override
+ public void stop() {
+ if (splitAssignment != null) {
+ splitAssignment.stop();
+ SplitSourceManager manager =
Env.getCurrentEnv().getSplitSourceManager();
+ for (Long sourceId : splitAssignment.getSources()) {
+ manager.removeSplitSource(sourceId);
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 92cdfbcfa1f..bb6865582fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -110,7 +110,11 @@ public abstract class FileScanNode extends
ExternalScanNode {
output.append(getRuntimeFilterExplainString(false));
}
-
output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(",
totalFileSize=")
+ output.append(prefix);
+ if (isBatchMode()) {
+ output.append("(approximate)");
+ }
+ output.append("inputSplitNum=").append(inputSplitsNum).append(",
totalFileSize=")
.append(totalFileSize).append(",
scanRanges=").append(scanRangeLocations.size()).append("\n");
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
.append("\n");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index f41eaba7dd8..928854b91d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -18,68 +18,160 @@
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TScanRangeLocations;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* When file splits are supplied in batch mode, splits are generated lazily
and assigned in each call of `getNextBatch`.
* `SplitGenerator` provides the file splits, and `FederationBackendPolicy`
assigns these splits to backends.
*/
public class SplitAssignment {
- // magic number to estimate how many splits are allocated to BE in each
batch
- private static final int NUM_SPLITS_PER_BE = 1024;
- // magic number to estimate how many splits are generated of each
partition in each batch.
- private static final int NUM_SPLITS_PER_PARTITION = 10;
-
+ private final Set<Long> sources = new HashSet<>();
private final FederationBackendPolicy backendPolicy;
private final SplitGenerator splitGenerator;
- // Store the current assignment of file splits
- private final Multimap<Backend, Split> assignment;
- private final int maxBatchSize;
+ private final ConcurrentHashMap<Backend,
BlockingQueue<Collection<TScanRangeLocations>>> assignment
+ = new ConcurrentHashMap<>();
+ private final SplitToScanRange splitToScanRange;
+ private final Map<String, String> locationProperties;
+ private final List<String> pathPartitionKeys;
+ private final Object assignLock = new Object();
+ private Split sampleSplit = null;
+ private final AtomicBoolean isStop = new AtomicBoolean(false);
+ private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
+
+ private UserException exception = null;
- public SplitAssignment(FederationBackendPolicy backendPolicy,
SplitGenerator splitGenerator) {
+ public SplitAssignment(
+ FederationBackendPolicy backendPolicy,
+ SplitGenerator splitGenerator,
+ SplitToScanRange splitToScanRange,
+ Map<String, String> locationProperties,
+ List<String> pathPartitionKeys) {
this.backendPolicy = backendPolicy;
this.splitGenerator = splitGenerator;
- this.assignment = ArrayListMultimap.create();
- int numPartitions =
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
- maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions,
- NUM_SPLITS_PER_BE * backendPolicy.numBackends());
+ this.splitToScanRange = splitToScanRange;
+ this.locationProperties = locationProperties;
+ this.pathPartitionKeys = pathPartitionKeys;
}
public void init() throws UserException {
- if (assignment.isEmpty() && splitGenerator.hasNext()) {
-
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
+ splitGenerator.startSplit();
+ synchronized (assignLock) {
+ while (sampleSplit == null && waitFirstSplit()) {
+ try {
+ assignLock.wait(100);
+ } catch (InterruptedException e) {
+ throw new UserException(e.getMessage(), e);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private boolean waitFirstSplit() {
+ return !scheduleFinished.get() && !isStop.get() && exception == null;
+ }
+
+ private void appendBatch(Multimap<Backend, Split> batch) throws
UserException {
+ for (Backend backend : batch.keySet()) {
+ Collection<Split> splits = batch.get(backend);
+ List<TScanRangeLocations> locations = new
ArrayList<>(splits.size());
+ for (Split split : splits) {
+ locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys));
+ }
+ if (!assignment.computeIfAbsent(backend, be -> new
LinkedBlockingQueue<>()).offer(locations)) {
+ throw new UserException("Failed to offer batch split");
+ }
}
}
- public Multimap<Backend, Split> getCurrentAssignment() {
- return assignment;
+ public void registerSource(long uniqueId) {
+ sources.add(uniqueId);
+ }
+
+ public Set<Long> getSources() {
+ return sources;
}
- public int numApproximateSplits() {
- return splitGenerator.numApproximateSplits();
+ public Split getSampleSplit() {
+ return sampleSplit;
}
- public synchronized Collection<Split> getNextBatch(Backend backend) throws
UserException {
- // Each call should consume all splits
- Collection<Split> splits = assignment.removeAll(backend);
- while (splits.isEmpty()) {
- // Get the next batch of splits, and assign to backends
- // If there is data skewing, it maybe causes splits to accumulate
on some BE
- if (!splitGenerator.hasNext()) {
- return splits;
+ public void addToQueue(List<Split> splits) {
+ if (splits.isEmpty()) {
+ return;
+ }
+ Multimap<Backend, Split> batch = null;
+ synchronized (assignLock) {
+ if (sampleSplit == null) {
+ sampleSplit = splits.get(0);
+ assignLock.notify();
+ }
+ try {
+ batch = backendPolicy.computeScanRangeAssignment(splits);
+ } catch (UserException e) {
+ exception = e;
+ }
+ }
+ if (batch != null) {
+ try {
+ appendBatch(batch);
+ } catch (UserException e) {
+ exception = e;
}
- // todo: In each batch, it's to find the optimal assignment for
partial splits,
- // how to solve the global data skew?
-
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
- splits = assignment.removeAll(backend);
+ }
+ }
+
+ private void notifyAssignment() {
+ synchronized (assignLock) {
+ assignLock.notify();
+ }
+ }
+
+ public BlockingQueue<Collection<TScanRangeLocations>>
getAssignedSplits(Backend backend) throws UserException {
+ if (exception != null) {
+ throw exception;
+ }
+ BlockingQueue<Collection<TScanRangeLocations>> splits =
assignment.computeIfAbsent(backend,
+ be -> new LinkedBlockingQueue<>());
+ if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
+ return null;
}
return splits;
}
+
+ public void setException(UserException e) {
+ exception = e;
+ notifyAssignment();
+ }
+
+ public void finishSchedule() {
+ scheduleFinished.set(true);
+ notifyAssignment();
+ }
+
+ public void stop() {
+ isStop.set(true);
+ notifyAssignment();
+ }
+
+ public boolean isStop() {
+ return isStop.get();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index b819c7f9ef2..c4a373bc85b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -28,27 +28,12 @@ import java.util.List;
* The consumer should call `getNextBatch` to fetch the next batch of splits.
*/
public interface SplitGenerator {
- /**
- * Get the next batch of splits. If the producer(e.g. ScanNode) doesn't
support batch mode,
- * should throw user exceptions.
- */
- default List<Split> getNextBatch(int maxBatchSize) throws UserException {
- throw new NotImplementedException("Should implement getNextBatch if in
batch mode.");
- }
-
/**
* Get all file splits if the producer doesn't support batch mode.
*/
default List<Split> getSplits() throws UserException {
// todo: remove this interface if batch mode is stable
- throw new NotImplementedException("Scan node sub class need to
implement getSplits interface.");
- }
-
- /**
- * `getNextBatch` should return empty list even if `hasNext` returns false.
- */
- default boolean hasNext() {
- return false;
+ throw new NotImplementedException("Not implement");
}
/**
@@ -65,4 +50,13 @@ public interface SplitGenerator {
default int numApproximateSplits() {
return -1;
}
+
+ default void startSplit() {
+ }
+
+ /**
+ * Close split generator, and stop the split executor
+ */
+ default void stop() {
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
index 74e6aa88ba3..dce135292ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
@@ -18,16 +18,17 @@
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
-import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
-import java.util.ArrayList;
+import com.google.common.collect.Lists;
+
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -42,28 +43,20 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
+ private static final long WAIT_TIME_OUT = 100; // 100ms
+ private static final long MAX_WAIT_TIME_OUT = 500; // 500ms
private final long uniqueId;
- private final SplitToScanRange splitToScanRange;
private final Backend backend;
- private final Map<String, String> locationProperties;
- private final List<String> pathPartitionKeys;
private final SplitAssignment splitAssignment;
- private Iterator<Split> splitIterator = null;
- private boolean isLastBatch = false;
+ private final AtomicBoolean isLastBatch;
- public SplitSource(
- SplitToScanRange splitToScanRange,
- Backend backend,
- Map<String, String> locationProperties,
- SplitAssignment splitAssignment,
- List<String> pathPartitionKeys) {
+ public SplitSource(Backend backend, SplitAssignment splitAssignment) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
- this.splitToScanRange = splitToScanRange;
this.backend = backend;
- this.locationProperties = locationProperties;
- this.pathPartitionKeys = pathPartitionKeys;
this.splitAssignment = splitAssignment;
+ this.isLastBatch = new AtomicBoolean(false);
+ splitAssignment.registerSource(uniqueId);
}
public long getUniqueId() {
@@ -73,22 +66,33 @@ public class SplitSource {
/**
* Get the next batch of file splits. If there's no more split, return
empty list.
*/
- public synchronized List<TScanRangeLocations> getNextBatch(int
maxBatchSize) throws UserException {
- if (isLastBatch) {
+ public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws
UserException {
+ if (isLastBatch.get()) {
return Collections.emptyList();
}
- List<TScanRangeLocations> scanRanges = new ArrayList<>(maxBatchSize);
- for (int i = 0; i < maxBatchSize; i++) {
- if (splitIterator == null || !splitIterator.hasNext()) {
- Collection<Split> splits =
splitAssignment.getNextBatch(backend);
- if (splits.isEmpty()) {
- isLastBatch = true;
- return scanRanges;
+ List<TScanRangeLocations> scanRanges =
Lists.newArrayListWithExpectedSize(maxBatchSize);
+ long maxTimeOut = 0;
+ while (scanRanges.size() < maxBatchSize) {
+ BlockingQueue<Collection<TScanRangeLocations>> splits =
splitAssignment.getAssignedSplits(backend);
+ if (splits == null) {
+ isLastBatch.set(true);
+ break;
+ }
+ while (scanRanges.size() < maxBatchSize) {
+ try {
+ Collection<TScanRangeLocations> splitCollection =
splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
+ if (splitCollection == null) {
+ maxTimeOut += WAIT_TIME_OUT;
+ break;
+ }
+ scanRanges.addAll(splitCollection);
+ } catch (InterruptedException e) {
+ throw new UserException("Failed to get next batch of
splits", e);
}
- splitIterator = splits.iterator();
}
- scanRanges.add(splitToScanRange.getScanRange(
- backend, locationProperties, splitIterator.next(),
pathPartitionKeys));
+ if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
+ break;
+ }
}
return scanRanges;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index a22e951be40..b76b4675dee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -30,7 +30,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.CacheBulkLoader;
@@ -468,37 +467,39 @@ public class HiveMetaStoreCache {
public List<FileCacheValue>
getFilesByPartitionsWithCache(List<HivePartition> partitions,
String
bindBrokerName) {
- return getFilesByPartitions(partitions, true, bindBrokerName);
+ return getFilesByPartitions(partitions, true, true, bindBrokerName);
}
public List<FileCacheValue>
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
String
bindBrokerName) {
- return getFilesByPartitions(partitions, false, bindBrokerName);
+ return getFilesByPartitions(partitions, false, true, bindBrokerName);
}
- private List<FileCacheValue> getFilesByPartitions(List<HivePartition>
partitions,
- boolean withCache,
String bindBrokerName) {
+ public List<FileCacheValue> getFilesByPartitions(List<HivePartition>
partitions,
+ boolean withCache,
+ boolean concurrent,
+ String bindBrokerName) {
long start = System.currentTimeMillis();
- List<FileCacheKey> keys = partitions.stream().map(p -> {
- FileCacheKey fileCacheKey = p.isDummyPartition()
- ? FileCacheKey.createDummyCacheKey(p.getDbName(),
p.getTblName(), p.getPath(),
- p.getInputFormat(), bindBrokerName)
- : new FileCacheKey(p.getPath(), p.getInputFormat(),
p.getPartitionValues(), bindBrokerName);
- return fileCacheKey;
- }).collect(Collectors.toList());
+ List<FileCacheKey> keys = partitions.stream().map(p ->
p.isDummyPartition()
+ ? FileCacheKey.createDummyCacheKey(
+ p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), bindBrokerName)
+ : new FileCacheKey(p.getPath(), p.getInputFormat(),
p.getPartitionValues(), bindBrokerName))
+ .collect(Collectors.toList());
List<FileCacheValue> fileLists;
try {
if (withCache) {
- fileLists =
fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList());
+ fileLists = new
ArrayList<>(fileCacheRef.get().getAll(keys).values());
} else {
- List<Pair<FileCacheKey, Future<FileCacheValue>>> pList =
keys.stream()
- .map(key -> Pair.of(key, fileListingExecutor.submit(()
-> loadFiles(key))))
- .collect(Collectors.toList());
-
- fileLists = Lists.newArrayListWithExpectedSize(keys.size());
- for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) {
- fileLists.add(p.second.get());
+ if (concurrent) {
+ List<Future<FileCacheValue>> pList = keys.stream().map(
+ key -> fileListingExecutor.submit(() ->
loadFiles(key))).collect(Collectors.toList());
+ fileLists =
Lists.newArrayListWithExpectedSize(keys.size());
+ for (Future<FileCacheValue> p : pList) {
+ fileLists.add(p.get());
+ }
+ } else {
+ fileLists =
keys.stream().map(this::loadFiles).collect(Collectors.toList());
}
}
} catch (ExecutionException e) {
@@ -810,7 +811,7 @@ public class HiveMetaStoreCache {
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- properties, bindBrokerName));
+ properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
@@ -837,7 +838,7 @@ public class HiveMetaStoreCache {
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- properties, bindBrokerName));
+ properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 12024c25616..1970a48f2d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@@ -63,14 +64,17 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class HiveScanNode extends FileQueryScanNode {
@@ -98,9 +102,10 @@ public class HiveScanNode extends FileQueryScanNode {
private SelectedPartitions selectedPartitions = null;
private boolean partitionInit = false;
+ private final AtomicReference<UserException> batchException = new
AtomicReference<>(null);
private List<HivePartition> prunedPartitions;
- private Iterator<HivePartition> prunedPartitionsIter;
- private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
+ private final Semaphore splittersOnFlight = new
Semaphore(NUM_SPLITTERS_ON_FLIGHT);
+ private final AtomicInteger numSplitsPerPartition = new
AtomicInteger(NUM_SPLITS_PER_PARTITION);
/**
* * External file scan node for Query Hive table
@@ -140,7 +145,7 @@ public class HiveScanNode extends FileQueryScanNode {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
// partitioned table
- boolean isPartitionPruned = selectedPartitions == null ? false :
selectedPartitions.isPruned;
+ boolean isPartitionPruned = selectedPartitions != null &&
selectedPartitions.isPruned;
Collection<PartitionItem> partitionItems;
if (!isPartitionPruned) {
// partitionItems is null means that the partition is not
pruned by Nereids,
@@ -232,36 +237,52 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
- public List<Split> getNextBatch(int maxBatchSize) throws UserException {
- try {
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
- String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
- List<Split> allFiles = Lists.newArrayList();
- int numPartitions = 0;
- while (allFiles.size() < maxBatchSize &&
prunedPartitionsIter.hasNext()) {
- List<HivePartition> partitions = new
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
- for (int i = 0; i < NUM_PARTITIONS_PER_LOOP &&
prunedPartitionsIter.hasNext(); ++i) {
- partitions.add(prunedPartitionsIter.next());
- numPartitions++;
+ public void startSplit() {
+ if (prunedPartitions.isEmpty()) {
+ splitAssignment.finishSchedule();
+ return;
+ }
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+ Executor scheduleExecutor =
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
+ String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+ AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+ CompletableFuture.runAsync(() -> {
+ for (HivePartition partition : prunedPartitions) {
+ if (batchException.get() != null || splitAssignment.isStop()) {
+ break;
+ }
+ try {
+ splittersOnFlight.acquire();
+ } catch (InterruptedException e) {
+ batchException.set(new UserException(e.getMessage(), e));
+ break;
}
- getFileSplitByPartitions(cache, partitions, allFiles,
bindBrokerName);
+ CompletableFuture.runAsync(() -> {
+ try {
+ List<Split> allFiles = Lists.newArrayList();
+ getFileSplitByPartitions(cache,
Collections.singletonList(partition), allFiles, bindBrokerName);
+ if (allFiles.size() > numSplitsPerPartition.get()) {
+ numSplitsPerPartition.set(allFiles.size());
+ }
+ splitAssignment.addToQueue(allFiles);
+ } catch (IOException e) {
+ batchException.set(new UserException(e.getMessage(),
e));
+ } finally {
+ splittersOnFlight.release();
+ if (batchException.get() != null) {
+ splitAssignment.setException(batchException.get());
+ }
+ if (numFinishedPartitions.incrementAndGet() ==
prunedPartitions.size()) {
+ splitAssignment.finishSchedule();
+ }
+ }
+ }, scheduleExecutor);
}
- if (allFiles.size() / numPartitions > numSplitsPerPartition) {
- numSplitsPerPartition = allFiles.size() / numPartitions;
+ if (batchException.get() != null) {
+ splitAssignment.setException(batchException.get());
}
- return allFiles;
- } catch (Throwable t) {
- LOG.warn("get file split failed for table: {}",
hmsTable.getName(), t);
- throw new UserException(
- "get file split failed for table: " + hmsTable.getName() +
", err: " + Util.getRootCauseMessage(t),
- t);
- }
- }
-
- @Override
- public boolean hasNext() {
- return prunedPartitionsIter.hasNext();
+ });
}
@Override
@@ -272,7 +293,6 @@ public class HiveScanNode extends FileQueryScanNode {
} catch (Exception e) {
return false;
}
- prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions =
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@@ -281,7 +301,7 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
public int numApproximateSplits() {
- return numSplitsPerPartition * prunedPartitions.size();
+ return numSplitsPerPartition.get() * prunedPartitions.size();
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
@@ -290,7 +310,8 @@ public class HiveScanNode extends FileQueryScanNode {
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
} else {
- fileCaches = cache.getFilesByPartitionsWithCache(partitions,
bindBrokerName);
+ boolean withCache = Config.max_external_file_cache_num > 0;
+ fileCaches = cache.getFilesByPartitions(partitions, withCache,
withCache, bindBrokerName);
}
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses =
selectFiles(fileCaches);
@@ -463,10 +484,7 @@ public class HiveScanNode extends FileQueryScanNode {
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
String aggFunctionName = aggExpr.getFnName().getFunction();
- if (aggFunctionName.equalsIgnoreCase("COUNT")) {
- return true;
- }
- return false;
+ return aggFunctionName.equalsIgnoreCase("COUNT");
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8f2b3e598b9..82e21bcdd17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -69,13 +69,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -101,9 +103,11 @@ public class HudiScanNode extends HiveScanNode {
private HoodieTimeline timeline;
private Option<String> snapshotTimestamp;
private String queryInstant;
+
+ private final AtomicReference<UserException> batchException = new
AtomicReference<>(null);
private List<HivePartition> prunedPartitions;
- private Iterator<HivePartition> prunedPartitionsIter;
- private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
+ private final Semaphore splittersOnFlight = new
Semaphore(NUM_SPLITTERS_ON_FLIGHT);
+ private final AtomicInteger numSplitsPerPartition = new
AtomicInteger(NUM_SPLITS_PER_PARTITION);
private boolean incrementalRead = false;
private TableScanParams scanParams;
@@ -206,7 +210,6 @@ public class HudiScanNode extends HiveScanNode {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
prunedPartitions = Collections.emptyList();
- prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
return;
}
@@ -320,47 +323,47 @@ public class HudiScanNode extends HiveScanNode {
incrementalRelation.getEndTs())).collect(Collectors.toList());
}
+ private void getPartitionSplits(HivePartition partition, List<Split>
splits) throws IOException {
+ String globPath;
+ String partitionName;
+ if (partition.isDummyPartition()) {
+ partitionName = "";
+ globPath = hudiClient.getBasePathV2().toString() + "/*";
+ } else {
+ partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+ new Path(partition.getPath()));
+ globPath = String.format("%s/%s/*",
hudiClient.getBasePathV2().toString(), partitionName);
+ }
+ List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(
+ hudiClient.getRawFs(), new Path(globPath));
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
+ timeline, statuses.toArray(new FileStatus[0]));
+
+ if (isCowOrRoTable) {
+ fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
+ noLogsSplitNum.incrementAndGet();
+ String filePath = baseFile.getPath();
+ long fileSize = baseFile.getFileSize();
+ // Need add hdfs host to location
+ LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
+ Path splitFilePath = locationPath.toStorageLocation();
+ splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
+ new String[0], partition.getPartitionValues()));
+ });
+ } else {
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName,
queryInstant)
+ .forEach(fileSlice -> splits.add(
+ generateHudiSplit(fileSlice,
partition.getPartitionValues(), queryInstant)));
+ }
+ }
+
private void getPartitionSplits(List<HivePartition> partitions,
List<Split> splits) {
Executor executor =
Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
try {
- String globPath;
- String partitionName = "";
- if (partition.isDummyPartition()) {
- globPath = hudiClient.getBasePathV2().toString() + "/*";
- } else {
- partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
- new Path(partition.getPath()));
- globPath = String.format("%s/%s/*",
hudiClient.getBasePathV2().toString(), partitionName);
- }
- List<FileStatus> statuses;
- try {
- statuses =
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
- new Path(globPath));
- } catch (IOException e) {
- throw new RuntimeException("Failed to get hudi file
statuses on path: " + globPath, e);
- }
- HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
- timeline, statuses.toArray(new FileStatus[0]));
-
- if (isCowOrRoTable) {
- fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
- noLogsSplitNum.incrementAndGet();
- String filePath = baseFile.getPath();
- long fileSize = baseFile.getFileSize();
- // Need add hdfs host to location
- LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
- Path splitFilePath = locationPath.toStorageLocation();
- splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
- new String[0],
partition.getPartitionValues()));
- });
- } else {
-
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
- .forEach(fileSlice -> splits.add(
- generateHudiSplit(fileSlice,
partition.getPartitionValues(), queryInstant)));
- }
+ getPartitionSplits(partition, splits);
} catch (Throwable t) {
throwable.set(t);
} finally {
@@ -394,26 +397,48 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
- public List<Split> getNextBatch(int maxBatchSize) throws UserException {
- List<Split> splits = Collections.synchronizedList(new ArrayList<>());
- int numPartitions = 0;
- while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext())
{
- List<HivePartition> partitions = new
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
- for (int i = 0; i < NUM_PARTITIONS_PER_LOOP &&
prunedPartitionsIter.hasNext(); ++i) {
- partitions.add(prunedPartitionsIter.next());
- numPartitions++;
- }
- getPartitionSplits(partitions, splits);
+ public void startSplit() {
+ if (prunedPartitions.isEmpty()) {
+ splitAssignment.finishSchedule();
+ return;
}
- if (splits.size() / numPartitions > numSplitsPerPartition) {
- numSplitsPerPartition = splits.size() / numPartitions;
- }
- return splits;
- }
-
- @Override
- public boolean hasNext() {
- return prunedPartitionsIter.hasNext();
+ AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+ CompletableFuture.runAsync(() -> {
+ for (HivePartition partition : prunedPartitions) {
+ if (batchException.get() != null || splitAssignment.isStop()) {
+ break;
+ }
+ try {
+ splittersOnFlight.acquire();
+ } catch (InterruptedException e) {
+ batchException.set(new UserException(e.getMessage(), e));
+ break;
+ }
+ CompletableFuture.runAsync(() -> {
+ try {
+ List<Split> allFiles = Lists.newArrayList();
+ getPartitionSplits(partition, allFiles);
+ if (allFiles.size() > numSplitsPerPartition.get()) {
+ numSplitsPerPartition.set(allFiles.size());
+ }
+ splitAssignment.addToQueue(allFiles);
+ } catch (IOException e) {
+ batchException.set(new UserException(e.getMessage(),
e));
+ } finally {
+ splittersOnFlight.release();
+ if (batchException.get() != null) {
+ splitAssignment.setException(batchException.get());
+ }
+ if (numFinishedPartitions.incrementAndGet() ==
prunedPartitions.size()) {
+ splitAssignment.finishSchedule();
+ }
+ }
+ });
+ }
+ if (batchException.get() != null) {
+ splitAssignment.setException(batchException.get());
+ }
+ });
}
@Override
@@ -426,7 +451,6 @@ public class HudiScanNode extends HiveScanNode {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
- prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions =
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@@ -435,7 +459,7 @@ public class HudiScanNode extends HiveScanNode {
@Override
public int numApproximateSplits() {
- return numSplitsPerPartition * prunedPartitions.size();
+ return numSplitsPerPartition.get() * prunedPartitions.size();
}
private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String>
partitionValues, String queryInstant) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a11f3300b9d..9c896bd7504 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -44,6 +44,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
+import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
@@ -84,7 +85,7 @@ import java.util.stream.IntStream;
public abstract class ScanNode extends PlanNode implements SplitGenerator {
private static final Logger LOG = LogManager.getLogger(ScanNode.class);
protected static final int NUM_SPLITS_PER_PARTITION = 10;
- protected static final int NUM_PARTITIONS_PER_LOOP = 100;
+ protected static final int NUM_SPLITTERS_ON_FLIGHT =
Config.max_external_cache_loader_thread_pool_size;
protected final TupleDescriptor desc;
// for distribution prunner
protected Map<String, PartitionColumnFilter> columnFilters =
Maps.newHashMap();
@@ -95,6 +96,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
protected List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
protected List<SplitSource> splitSources = Lists.newArrayList();
protected PartitionInfo partitionsInfo = null;
+ protected SplitAssignment splitAssignment = null;
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 83acbba5d16..c6755746106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -634,6 +634,9 @@ public class Coordinator implements CoordInterface {
@Override
public void close() {
+ for (ScanNode scanNode : scanNodes) {
+ scanNode.stop();
+ }
if (queryQueue != null && queueToken != null) {
try {
queryQueue.releaseAndNotify(queueToken);
@@ -1208,6 +1211,9 @@ public class Coordinator implements CoordInterface {
@Override
public void cancel(Status cancelReason) {
+ for (ScanNode scanNode : scanNodes) {
+ scanNode.stop();
+ }
if (cancelReason.ok()) {
throw new RuntimeException("Should use correct cancel reason, but
it is "
+ cancelReason.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]