This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c03c92aab51 [Pick](branch-2.0) pick some PRs about point query
optimization (#28611)
c03c92aab51 is described below
commit c03c92aab51420a7083c7bfeaef0479912d56d27
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 19 14:26:47 2023 +0800
[Pick](branch-2.0) pick some PRs about point query optimization (#28611)
* [Fix](point query) fix memleak by increasing `scanReplicaIds` when using
prepared statement (#28184)
* [Performance](point query)Opimize partition prune for point query (#28150)
* [Improve](sort) avoid too may tmp vectors for get_columns (#27734)
---
be/src/vec/common/sort/heap_sorter.cpp | 3 +-
.../org/apache/doris/planner/OlapScanNode.java | 18 +++-
.../PartitionPruneV2ForShortCircuitPlan.java | 90 ++++++++++++++++
.../doris/planner/PartitionPrunerV2Base.java | 7 ++
.../java/org/apache/doris/planner/RangeMap.java | 73 +++++++++++++
.../java/org/apache/doris/qe/PointQueryExec.java | 7 ++
.../point_query_p0/test_point_query_partition.out | 33 ++++++
.../test_point_query_partition.groovy | 117 +++++++++++++++++++++
8 files changed, 346 insertions(+), 2 deletions(-)
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
index 71c66b9cd00..d702cbaca20 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -147,8 +147,9 @@ Status HeapSorter::prepare_for_read() {
for (int i = capacity - 1; i >= 0; i--) {
auto rid = vector_to_reverse[i].row_id();
const auto cur_block = vector_to_reverse[i].block();
+ Columns columns = cur_block->get_columns();
for (size_t j = 0; j < num_columns; ++j) {
- result_columns[j]->insert_from(*(cur_block->get_columns()[j]),
rid);
+ result_columns[j]->insert_from(*(columns[j]), rid);
}
}
_return_block =
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 13c83226235..11ec7b7f2d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
@@ -198,6 +199,11 @@ public class OlapScanNode extends ScanNode {
private boolean shouldColoScan = false;
+ // cached for prepared statement to quickly prune partition
+ // only used in short circuit plan at present
+ private final PartitionPruneV2ForShortCircuitPlan cachedPartitionPruner =
+ new PartitionPruneV2ForShortCircuitPlan();
+
// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName) {
super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
@@ -669,8 +675,17 @@ public class OlapScanNode extends ScanNode {
} else {
keyItemMap = partitionInfo.getIdToItem(false);
}
-
if (partitionInfo.getType() == PartitionType.RANGE) {
+ if (isPointQuery() && partitionInfo.getPartitionColumns().size()
== 1) {
+ // short circuit, a quick path to find partition
+ ColumnRange filterRange =
columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName());
+ LiteralExpr lowerBound =
filterRange.getRangeSet().get().asRanges().stream()
+ .findFirst().get().lowerEndpoint().getValue();
+ LiteralExpr upperBound =
filterRange.getRangeSet().get().asRanges().stream()
+ .findFirst().get().upperEndpoint().getValue();
+ cachedPartitionPruner.update(keyItemMap);
+ return cachedPartitionPruner.prune(lowerBound, upperBound);
+ }
partitionPruner = new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(), columnNameToRange);
} else if (partitionInfo.getType() == PartitionType.LIST) {
@@ -1143,6 +1158,7 @@ public class OlapScanNode extends ScanNode {
scanBackendIds.clear();
scanTabletIds.clear();
bucketSeq2locations.clear();
+ scanReplicaIds.clear();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
new file mode 100644
index 00000000000..8d39ed4d4fc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Range;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base
{
+ private static final Logger LOG =
LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class);
+ // map to record literal range to find specific partition
+ private RangeMap<LiteralExpr, Long> partitionRangeMapByLiteral = new
RangeMap<>();
+ // last timestamp partitionRangeMapByLiteral updated
+ private long lastPartitionRangeMapUpdateTimestampMs = 0;
+
+ PartitionPruneV2ForShortCircuitPlan() {
+ super();
+ }
+
+ public boolean update(Map<Long, PartitionItem> keyItemMap) {
+ // interval to update partitionRangeMapByLiteral
+ long partitionRangeMapUpdateIntervalS = 10;
+ if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs
+ > partitionRangeMapUpdateIntervalS * 1000) {
+ partitionRangeMapByLiteral = new RangeMap<>();
+ // recalculate map
+ for (Entry<Long, PartitionItem> entry : keyItemMap.entrySet()) {
+ Range<PartitionKey> range = entry.getValue().getItems();
+ LiteralExpr partitionLowerBound = (LiteralExpr)
range.lowerEndpoint().getKeys().get(0);
+ LiteralExpr partitionUpperBound = (LiteralExpr)
range.upperEndpoint().getKeys().get(0);
+ Range<LiteralExpr> partitionRange =
Range.closedOpen(partitionLowerBound, partitionUpperBound);
+ partitionRangeMapByLiteral.put(partitionRange, entry.getKey());
+ }
+ LOG.debug("update partitionRangeMapByLiteral");
+ this.lastPartitionRangeMapUpdateTimestampMs =
System.currentTimeMillis();
+ return true;
+ }
+ return false;
+ }
+
+ public Collection<Long> prune(LiteralExpr lowerBound, LiteralExpr
upperBound) throws AnalysisException {
+ Range<LiteralExpr> filterRangeValue = Range.closed(lowerBound,
upperBound);
+ return
partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue);
+ }
+
+ @Override
+ public Collection<Long> prune() throws AnalysisException {
+ throw new AnalysisException("Not implemented");
+ }
+
+ @Override
+ void genSingleColumnRangeMap() {
+ }
+
+ @Override
+ FinalFilters getFinalFilters(ColumnRange columnRange,
+ Column column) throws AnalysisException {
+ throw new AnalysisException("Not implemented");
+ }
+
+ @Override
+ Collection<Long> pruneMultipleColumnPartition(Map<Column, FinalFilters>
columnToFilters) throws AnalysisException {
+ throw new AnalysisException("Not implemented");
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
index 376e2a4c7f0..1d9f163ca80 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
@@ -48,6 +48,13 @@ public abstract class PartitionPrunerV2Base implements
PartitionPruner {
// currently only used for list partition
private Map.Entry<Long, PartitionItem> defaultPartition;
+ // Only called in PartitionPruneV2ByShortCircuitPlan constructor
+ PartitionPrunerV2Base() {
+ this.idToPartitionItem = null;
+ this.partitionColumns = null;
+ this.columnNameToRange = null;
+ }
+
public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
List<Column> partitionColumns,
Map<String, ColumnRange> columnNameToRange) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java
new file mode 100644
index 00000000000..c8fd07f940d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class RangeMap<C extends Comparable<C>, V> {
+
+ private final NavigableMap<Range<C>, V> rangeMap = new TreeMap<>(new
RangeComparator<C>());
+
+ public void put(Range<C> range, V value) {
+ rangeMap.put(range, value);
+ }
+
+ public List<V> getOverlappingRangeValues(Range<C> searchRange) {
+ return getOverlappingRanges(searchRange).stream()
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ public List<Map.Entry<Range<C>, V>> getOverlappingRanges(Range<C>
searchRange) {
+ List<Map.Entry<Range<C>, V>> overlappingRanges = new ArrayList<>();
+
+ // Find the possible starting point for the search
+ Map.Entry<Range<C>, V> floorEntry = rangeMap.floorEntry(searchRange);
+ Map.Entry<Range<C>, V> ceilingEntry =
rangeMap.ceilingEntry(searchRange);
+
+ // Start iterating from the earlier of the floor or ceiling entry
+ Map.Entry<Range<C>, V> startEntry = (floorEntry != null) ? floorEntry
: ceilingEntry;
+ if (startEntry == null) {
+ return overlappingRanges;
+ }
+
+ for (Map.Entry<Range<C>, V> entry :
rangeMap.tailMap(startEntry.getKey()).entrySet()) {
+ if
(entry.getKey().lowerEndpoint().compareTo(searchRange.upperEndpoint()) > 0) {
+ break; // No more overlapping ranges possible
+ }
+ if (entry.getKey().isConnected(searchRange) &&
!entry.getKey().intersection(searchRange).isEmpty()) {
+ overlappingRanges.add(entry);
+ }
+ }
+ return overlappingRanges;
+ }
+
+ private static class RangeComparator<C extends Comparable<C>> implements
java.util.Comparator<Range<C>> {
+ @Override
+ public int compare(Range<C> r1, Range<C> r2) {
+ return r1.lowerEndpoint().compareTo(r2.lowerEndpoint());
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 0ffb5b989d8..0d18b2f08c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -121,6 +121,9 @@ public class PointQueryExec implements CoordInterface {
OlapScanNode planRoot = getPlanRoot();
// compute scan range
List<TScanRangeLocations> locations =
planRoot.lazyEvaluateRangeLocations();
+ if (planRoot.getScanTabletIds().isEmpty()) {
+ return;
+ }
Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
this.tabletID = planRoot.getScanTabletIds().get(0);
@@ -167,6 +170,10 @@ public class PointQueryExec implements CoordInterface {
@Override
public RowBatch getNext() throws Exception {
setScanRangeLocations();
+ // No partition/tablet found return emtpy row batch
+ if (candidateBackends == null || candidateBackends.isEmpty()) {
+ return new RowBatch();
+ }
Iterator<Backend> backendIter = candidateBackends.iterator();
RowBatch rowBatch = null;
int tryCount = 0;
diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out
b/regression-test/data/point_query_p0/test_point_query_partition.out
new file mode 100644
index 00000000000..cd22e6c93ec
--- /dev/null
+++ b/regression-test/data/point_query_p0/test_point_query_partition.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !point_select --
+1 a
+
+-- !point_select --
+2 b
+
+-- !point_select --
+11 d
+
+-- !point_select --
+-1 c
+
+-- !point_select --
+11 d
+
+-- !point_select --
+
+-- !point_select --
+
+-- !point_select --
+33 f
+
+-- !point_select --
+45 g
+
+-- !point_select --
+
+-- !point_select --
+999 h
+
+-- !point_select --
+
diff --git
a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
new file mode 100644
index 00000000000..7b5966db0c1
--- /dev/null
+++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.math.BigDecimal;
+
+suite("test_point_query_partition") {
+ def user = context.config.jdbcUser
+ def password = context.config.jdbcPassword
+ def realDb = "regression_test_serving_p0"
+ def tableName = realDb + ".tbl_point_query_partition"
+ sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
+
+ // Parse url
+ String jdbcUrl = context.config.jdbcUrl
+ String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+ def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
+ def sql_port
+ if (urlWithoutSchema.indexOf("/") >= 0) {
+ // e.g: jdbc:mysql://locahost:8080/?a=b
+ sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") +
1, urlWithoutSchema.indexOf("/"))
+ } else {
+ // e.g: jdbc:mysql://locahost:8080
+ sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") +
1)
+ }
+ // set server side prepared statement url
+ def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb
+ "?&useServerPrepStmts=true"
+
+ def generateString = {len ->
+ def str = ""
+ for (int i = 0; i < len; i++) {
+ str += "a"
+ }
+ return str
+ }
+
+ def nprep_sql = { sql_str ->
+ def url_without_prep = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/"
+ realDb
+ connect(user = user, password = password, url = url_without_prep) {
+ sql sql_str
+ }
+ }
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(11) NULL COMMENT "",
+ `value` text NULL COMMENT ""
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k1`)
+ PARTITION BY RANGE(`k1`)
+ (
+ PARTITION `p1` VALUES LESS THAN ("1"),
+ PARTITION `p2` VALUES LESS THAN ("10"),
+ PARTITION `p3` VALUES LESS THAN ("30"),
+ PARTITION `p4` VALUES LESS THAN ("40"),
+ PARTITION `p5` VALUES LESS THAN ("1000")
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "store_row_column" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true",
+ "storage_format" = "V2")
+ """
+
+ sql """INSERT INTO ${tableName} VALUES (1, 'a')"""
+ sql """INSERT INTO ${tableName} VALUES (2, 'b')"""
+ sql """INSERT INTO ${tableName} VALUES (-1, 'c')"""
+ sql """INSERT INTO ${tableName} VALUES (11, 'd')"""
+ sql """INSERT INTO ${tableName} VALUES (15, 'e')"""
+ sql """INSERT INTO ${tableName} VALUES (33, 'f')"""
+ sql """INSERT INTO ${tableName} VALUES (45, 'g')"""
+ sql """INSERT INTO ${tableName} VALUES (999, 'h')"""
+ def result1 = connect(user=user, password=password, url=prepare_url) {
+ def stmt = prepareStatement "select * from ${tableName} where k1 = ?"
+ assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt.setInt(1, 1)
+ qe_point_select stmt
+ stmt.setInt(1, 2)
+ qe_point_select stmt
+ stmt.setInt(1, 11)
+ qe_point_select stmt
+ stmt.setInt(1, -1)
+ qe_point_select stmt
+ stmt.setInt(1, 11)
+ qe_point_select stmt
+ stmt.setInt(1, 12)
+ qe_point_select stmt
+ stmt.setInt(1, 34)
+ qe_point_select stmt
+ stmt.setInt(1, 33)
+ qe_point_select stmt
+ stmt.setInt(1, 45)
+ qe_point_select stmt
+ stmt.setInt(1, 666)
+ qe_point_select stmt
+ stmt.setInt(1, 999)
+ qe_point_select stmt
+ stmt.setInt(1, 1000)
+ qe_point_select stmt
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]