This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a6c03c1c838 [improvement](iceberg)Parallelize splits for count(*)
(#41169)
a6c03c1c838 is described below
commit a6c03c1c838bee462b5eb1c95e90c8d246a55536
Author: wuwenchi <[email protected]>
AuthorDate: Sun Oct 13 20:05:54 2024 +0800
[improvement](iceberg)Parallelize splits for count(*) (#41169)
## Proposed changes
1. Parallelize splits to prevent bottlenecks in a single split.
2. Only a single column needs to be resized.
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 10 ++++--
be/src/vec/exec/format/table/iceberg_reader.h | 13 ++++---
be/src/vec/exec/scan/vfile_scanner.cpp | 8 ++---
.../datasource/iceberg/source/IcebergScanNode.java | 40 +++++++++++++++++-----
.../datasource/iceberg/source/IcebergSplit.java | 9 +++++
gensrc/thrift/PlanNodes.thrift | 1 +
6 files changed, 59 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 424ef5d7bf4..295a3a40544 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -76,15 +76,14 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
RuntimeProfile* profile, RuntimeState*
state,
const TFileScanRangeParams& params,
const TFileRangeDesc& range,
ShardedKVCache* kv_cache,
- io::IOContext* io_ctx, int64_t
push_down_count)
+ io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_state(state),
_params(params),
_range(range),
_kv_cache(kv_cache),
- _io_ctx(io_ctx),
- _remaining_push_down_count(push_down_count) {
+ _io_ctx(io_ctx) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
@@ -95,6 +94,11 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
+ if (range.table_format_params.iceberg_params.__isset.row_count) {
+ _remaining_push_down_count =
range.table_format_params.iceberg_params.row_count;
+ } else {
+ _remaining_push_down_count = -1;
+ }
}
Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index a4ecbe9e360..04f64aad518 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -76,8 +76,8 @@ public:
IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx,
- int64_t push_down_count);
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
+ io::IOContext* io_ctx);
~IcebergTableReader() override = default;
Status init_row_filters(const TFileRangeDesc& range, io::IOContext*
io_ctx) final;
@@ -197,9 +197,9 @@ public:
IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams&
params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
- io::IOContext* io_ctx, int64_t push_down_count)
+ io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile,
state, params, range,
- kv_cache, io_ctx, push_down_count) {}
+ kv_cache, io_ctx) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
@@ -237,10 +237,9 @@ public:
IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx,
- int64_t push_down_count)
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile,
state, params, range,
- kv_cache, io_ctx, push_down_count) {}
+ kv_cache, io_ctx) {}
void set_delete_rows() override {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index e9681a5df9e..4fc58177c8a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -842,7 +842,7 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params,
range, _kv_cache,
- _io_ctx.get(),
_get_push_down_count());
+ _io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map,
_colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
@@ -912,9 +912,9 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"iceberg") {
- std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(
- std::move(orc_reader), _profile, _state, *_params,
range, _kv_cache,
- _io_ctx.get(), _get_push_down_count());
+ std::unique_ptr<IcebergOrcReader> iceberg_reader =
+ IcebergOrcReader::create_unique(std::move(orc_reader),
_profile, _state,
+ *_params, range,
_kv_cache, _io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map,
_colname_to_value_range,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 2ca51298fe6..fe6c54cf53b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -36,6 +36,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -86,6 +87,8 @@ public class IcebergScanNode extends FileQueryScanNode {
private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();
+ private boolean pushDownCount = false;
+ private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
/**
* External file scan node for Query iceberg table
@@ -137,6 +140,9 @@ public class IcebergScanNode extends FileQueryScanNode {
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
+ if (pushDownCount) {
+ fileDesc.setRowCount(icebergSplit.getRowCount());
+ }
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
@@ -255,9 +261,24 @@ public class IcebergScanNode extends FileQueryScanNode {
}
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
- if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) {
+ if (aggOp.equals(TPushAggOp.COUNT)) {
// we can create a special empty split and skip the plan process
- return splits.isEmpty() ? splits :
Collections.singletonList(splits.get(0));
+ if (splits.isEmpty()) {
+ return splits;
+ }
+ long countFromSnapshot = getCountFromSnapshot();
+ if (countFromSnapshot >= 0) {
+ pushDownCount = true;
+ List<Split> pushDownCountSplits;
+ if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
+ int parallelNum =
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ pushDownCountSplits = splits.subList(0,
Math.min(splits.size(), parallelNum));
+ } else {
+ pushDownCountSplits =
Collections.singletonList(splits.get(0));
+ }
+ assignCountToSplits(pushDownCountSplits, countFromSnapshot);
+ return pushDownCountSplits;
+ }
}
selectedPartitionNum = partitionPathSet.size();
@@ -374,12 +395,6 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
- if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
- long countFromSnapshot = getCountFromSnapshot();
- if (countFromSnapshot >= 0) {
- planNode.setPushDownCount(countFromSnapshot);
- }
- }
}
@Override
@@ -399,4 +414,13 @@ public class IcebergScanNode extends FileQueryScanNode {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix,
sb);
}
+
+ private void assignCountToSplits(List<Split> splits, long totalCount) {
+ int size = splits.size();
+ long countPerSplit = totalCount / size;
+ for (int i = 0; i < size - 1; i++) {
+ ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
+ }
+ ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit +
totalCount % size);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 8549e96bc2e..46e8f96ba35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -37,6 +37,7 @@ public class IcebergSplit extends FileSplit {
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
+ private long rowCount = -1;
// File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(LocationPath file, long start, long length, long
fileLength, String[] hosts,
@@ -47,4 +48,12 @@ public class IcebergSplit extends FileSplit {
this.config = config;
this.originalPath = originalPath;
}
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index dcabb4ef5e5..5c0273da791 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -309,6 +309,7 @@ struct TIcebergFileDesc {
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
6: optional string original_file_path;
+ 7: optional i64 row_count;
}
struct TPaimonDeletionFileDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]