This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c03c92aab51 [Pick](branch-2.0) pick some PRs about point query 
optimization (#28611)
c03c92aab51 is described below

commit c03c92aab51420a7083c7bfeaef0479912d56d27
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 19 14:26:47 2023 +0800

    [Pick](branch-2.0) pick some PRs about point query optimization (#28611)
    
    * [Fix](point query) fix memleak by increasing `scanReplicaIds` when using 
prepared statement (#28184)
    * [Performance](point query)Opimize partition prune for point query (#28150)
    * [Improve](sort) avoid too may tmp vectors for get_columns (#27734)
---
 be/src/vec/common/sort/heap_sorter.cpp             |   3 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  18 +++-
 .../PartitionPruneV2ForShortCircuitPlan.java       |  90 ++++++++++++++++
 .../doris/planner/PartitionPrunerV2Base.java       |   7 ++
 .../java/org/apache/doris/planner/RangeMap.java    |  73 +++++++++++++
 .../java/org/apache/doris/qe/PointQueryExec.java   |   7 ++
 .../point_query_p0/test_point_query_partition.out  |  33 ++++++
 .../test_point_query_partition.groovy              | 117 +++++++++++++++++++++
 8 files changed, 346 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/common/sort/heap_sorter.cpp 
b/be/src/vec/common/sort/heap_sorter.cpp
index 71c66b9cd00..d702cbaca20 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -147,8 +147,9 @@ Status HeapSorter::prepare_for_read() {
         for (int i = capacity - 1; i >= 0; i--) {
             auto rid = vector_to_reverse[i].row_id();
             const auto cur_block = vector_to_reverse[i].block();
+            Columns columns = cur_block->get_columns();
             for (size_t j = 0; j < num_columns; ++j) {
-                result_columns[j]->insert_from(*(cur_block->get_columns()[j]), 
rid);
+                result_columns[j]->insert_from(*(columns[j]), rid);
             }
         }
         _return_block = 
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 13c83226235..11ec7b7f2d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
@@ -198,6 +199,11 @@ public class OlapScanNode extends ScanNode {
 
     private boolean shouldColoScan = false;
 
+    // cached for prepared statement to quickly prune partition
+    // only used in short circuit plan at present
+    private final PartitionPruneV2ForShortCircuitPlan cachedPartitionPruner =
+                        new PartitionPruneV2ForShortCircuitPlan();
+
     // Constructs node to scan given data files of table 'tbl'.
     public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName) {
         super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
@@ -669,8 +675,17 @@ public class OlapScanNode extends ScanNode {
         } else {
             keyItemMap = partitionInfo.getIdToItem(false);
         }
-
         if (partitionInfo.getType() == PartitionType.RANGE) {
+            if (isPointQuery() && partitionInfo.getPartitionColumns().size() 
== 1) {
+                // short circuit, a quick path to find partition
+                ColumnRange filterRange = 
columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName());
+                LiteralExpr lowerBound = 
filterRange.getRangeSet().get().asRanges().stream()
+                        .findFirst().get().lowerEndpoint().getValue();
+                LiteralExpr upperBound = 
filterRange.getRangeSet().get().asRanges().stream()
+                        .findFirst().get().upperEndpoint().getValue();
+                cachedPartitionPruner.update(keyItemMap);
+                return cachedPartitionPruner.prune(lowerBound, upperBound);
+            }
             partitionPruner = new RangePartitionPrunerV2(keyItemMap,
                     partitionInfo.getPartitionColumns(), columnNameToRange);
         } else if (partitionInfo.getType() == PartitionType.LIST) {
@@ -1143,6 +1158,7 @@ public class OlapScanNode extends ScanNode {
         scanBackendIds.clear();
         scanTabletIds.clear();
         bucketSeq2locations.clear();
+        scanReplicaIds.clear();
         try {
             createScanRangeLocations();
         } catch (AnalysisException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
new file mode 100644
index 00000000000..8d39ed4d4fc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Range;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base 
{
+    private static final Logger LOG = 
LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class);
+    // map to record literal range to find specific partition
+    private RangeMap<LiteralExpr, Long> partitionRangeMapByLiteral = new 
RangeMap<>();
+    // last timestamp partitionRangeMapByLiteral updated
+    private long lastPartitionRangeMapUpdateTimestampMs = 0;
+
+    PartitionPruneV2ForShortCircuitPlan() {
+        super();
+    }
+
+    public boolean update(Map<Long, PartitionItem> keyItemMap) {
+        // interval to update partitionRangeMapByLiteral
+        long partitionRangeMapUpdateIntervalS = 10;
+        if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs
+                    > partitionRangeMapUpdateIntervalS * 1000) {
+            partitionRangeMapByLiteral = new RangeMap<>();
+            // recalculate map
+            for (Entry<Long, PartitionItem> entry : keyItemMap.entrySet()) {
+                Range<PartitionKey> range = entry.getValue().getItems();
+                LiteralExpr partitionLowerBound = (LiteralExpr) 
range.lowerEndpoint().getKeys().get(0);
+                LiteralExpr partitionUpperBound = (LiteralExpr) 
range.upperEndpoint().getKeys().get(0);
+                Range<LiteralExpr> partitionRange = 
Range.closedOpen(partitionLowerBound, partitionUpperBound);
+                partitionRangeMapByLiteral.put(partitionRange, entry.getKey());
+            }
+            LOG.debug("update partitionRangeMapByLiteral");
+            this.lastPartitionRangeMapUpdateTimestampMs = 
System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    public Collection<Long> prune(LiteralExpr lowerBound, LiteralExpr 
upperBound) throws AnalysisException {
+        Range<LiteralExpr> filterRangeValue = Range.closed(lowerBound, 
upperBound);
+        return 
partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue);
+    }
+
+    @Override
+    public Collection<Long> prune() throws AnalysisException {
+        throw new AnalysisException("Not implemented");
+    }
+
+    @Override
+    void genSingleColumnRangeMap() {
+    }
+
+    @Override
+    FinalFilters getFinalFilters(ColumnRange columnRange,
+            Column column) throws AnalysisException {
+        throw new AnalysisException("Not implemented");
+    }
+
+    @Override
+    Collection<Long> pruneMultipleColumnPartition(Map<Column, FinalFilters> 
columnToFilters) throws AnalysisException {
+        throw new AnalysisException("Not implemented");
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
index 376e2a4c7f0..1d9f163ca80 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
@@ -48,6 +48,13 @@ public abstract class PartitionPrunerV2Base implements 
PartitionPruner {
     // currently only used for list partition
     private Map.Entry<Long, PartitionItem> defaultPartition;
 
+    // Only called in PartitionPruneV2ByShortCircuitPlan constructor
+    PartitionPrunerV2Base() {
+        this.idToPartitionItem = null;
+        this.partitionColumns = null;
+        this.columnNameToRange = null;
+    }
+
     public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
             List<Column> partitionColumns,
             Map<String, ColumnRange> columnNameToRange) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java
new file mode 100644
index 00000000000..c8fd07f940d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class RangeMap<C extends Comparable<C>, V> {
+
+    private final NavigableMap<Range<C>, V> rangeMap = new TreeMap<>(new 
RangeComparator<C>());
+
+    public void put(Range<C> range, V value) {
+        rangeMap.put(range, value);
+    }
+
+    public List<V> getOverlappingRangeValues(Range<C> searchRange) {
+        return getOverlappingRanges(searchRange).stream()
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+    }
+
+    public List<Map.Entry<Range<C>, V>> getOverlappingRanges(Range<C> 
searchRange) {
+        List<Map.Entry<Range<C>, V>> overlappingRanges = new ArrayList<>();
+
+        // Find the possible starting point for the search
+        Map.Entry<Range<C>, V> floorEntry = rangeMap.floorEntry(searchRange);
+        Map.Entry<Range<C>, V> ceilingEntry = 
rangeMap.ceilingEntry(searchRange);
+
+        // Start iterating from the earlier of the floor or ceiling entry
+        Map.Entry<Range<C>, V> startEntry = (floorEntry != null) ? floorEntry 
: ceilingEntry;
+        if (startEntry == null) {
+            return overlappingRanges;
+        }
+
+        for (Map.Entry<Range<C>, V> entry : 
rangeMap.tailMap(startEntry.getKey()).entrySet()) {
+            if 
(entry.getKey().lowerEndpoint().compareTo(searchRange.upperEndpoint()) > 0) {
+                break; // No more overlapping ranges possible
+            }
+            if (entry.getKey().isConnected(searchRange) && 
!entry.getKey().intersection(searchRange).isEmpty()) {
+                overlappingRanges.add(entry);
+            }
+        }
+        return overlappingRanges;
+    }
+
+    private static class RangeComparator<C extends Comparable<C>> implements 
java.util.Comparator<Range<C>> {
+        @Override
+        public int compare(Range<C> r1, Range<C> r2) {
+            return r1.lowerEndpoint().compareTo(r2.lowerEndpoint());
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 0ffb5b989d8..0d18b2f08c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -121,6 +121,9 @@ public class PointQueryExec implements CoordInterface {
         OlapScanNode planRoot = getPlanRoot();
         // compute scan range
         List<TScanRangeLocations> locations = 
planRoot.lazyEvaluateRangeLocations();
+        if (planRoot.getScanTabletIds().isEmpty()) {
+            return;
+        }
         Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
         this.tabletID = planRoot.getScanTabletIds().get(0);
 
@@ -167,6 +170,10 @@ public class PointQueryExec implements CoordInterface {
     @Override
     public RowBatch getNext() throws Exception {
         setScanRangeLocations();
+        // No partition/tablet found return emtpy row batch
+        if (candidateBackends == null || candidateBackends.isEmpty()) {
+            return new RowBatch();
+        }
         Iterator<Backend> backendIter = candidateBackends.iterator();
         RowBatch rowBatch = null;
         int tryCount = 0;
diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out 
b/regression-test/data/point_query_p0/test_point_query_partition.out
new file mode 100644
index 00000000000..cd22e6c93ec
--- /dev/null
+++ b/regression-test/data/point_query_p0/test_point_query_partition.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !point_select --
+1      a
+
+-- !point_select --
+2      b
+
+-- !point_select --
+11     d
+
+-- !point_select --
+-1     c
+
+-- !point_select --
+11     d
+
+-- !point_select --
+
+-- !point_select --
+
+-- !point_select --
+33     f
+
+-- !point_select --
+45     g
+
+-- !point_select --
+
+-- !point_select --
+999    h
+
+-- !point_select --
+
diff --git 
a/regression-test/suites/point_query_p0/test_point_query_partition.groovy 
b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
new file mode 100644
index 00000000000..7b5966db0c1
--- /dev/null
+++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.math.BigDecimal;
+
+suite("test_point_query_partition") {
+    def user = context.config.jdbcUser
+    def password = context.config.jdbcPassword
+    def realDb = "regression_test_serving_p0"
+    def tableName = realDb + ".tbl_point_query_partition"
+    sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
+
+    // Parse url
+    String jdbcUrl = context.config.jdbcUrl
+    String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+    def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
+    def sql_port
+    if (urlWithoutSchema.indexOf("/") >= 0) {
+        // e.g: jdbc:mysql://locahost:8080/?a=b
+        sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 
1, urlWithoutSchema.indexOf("/"))
+    } else {
+        // e.g: jdbc:mysql://locahost:8080
+        sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 
1)
+    }
+    // set server side prepared statement url
+    def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb 
+ "?&useServerPrepStmts=true"
+
+    def generateString = {len ->
+        def str = ""
+        for (int i = 0; i < len; i++) {
+            str += "a"
+        }
+        return str
+    }
+
+    def nprep_sql = { sql_str ->
+        def url_without_prep = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" 
+ realDb
+        connect(user = user, password = password, url = url_without_prep) {
+            sql sql_str
+        }
+    }
+
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+    sql """
+              CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(11) NULL COMMENT "",
+                `value` text NULL COMMENT ""
+              ) ENGINE=OLAP
+              UNIQUE KEY(`k1`)
+              PARTITION BY RANGE(`k1`)
+              (
+                  PARTITION `p1` VALUES LESS THAN ("1"),
+                  PARTITION `p2` VALUES LESS THAN ("10"),
+                  PARTITION `p3` VALUES LESS THAN ("30"),
+                  PARTITION `p4` VALUES LESS THAN ("40"),
+                  PARTITION `p5` VALUES LESS THAN ("1000")
+              )
+              DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+              PROPERTIES (
+              "replication_allocation" = "tag.location.default: 1",
+              "store_row_column" = "true",
+              "enable_unique_key_merge_on_write" = "true",
+              "light_schema_change" = "true",
+              "storage_format" = "V2")
+              """
+
+    sql """INSERT INTO ${tableName} VALUES (1, 'a')"""
+    sql """INSERT INTO ${tableName} VALUES (2, 'b')"""
+    sql """INSERT INTO ${tableName} VALUES (-1, 'c')"""
+    sql """INSERT INTO ${tableName} VALUES (11, 'd')"""
+    sql """INSERT INTO ${tableName} VALUES (15, 'e')"""
+    sql """INSERT INTO ${tableName} VALUES (33, 'f')"""
+    sql """INSERT INTO ${tableName} VALUES (45, 'g')"""
+    sql """INSERT INTO ${tableName} VALUES (999, 'h')"""
+    def result1 = connect(user=user, password=password, url=prepare_url) {
+        def stmt = prepareStatement "select * from ${tableName} where k1 = ?"
+        assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+        stmt.setInt(1, 1)
+        qe_point_select stmt
+        stmt.setInt(1, 2)
+        qe_point_select stmt
+        stmt.setInt(1, 11)
+        qe_point_select stmt
+        stmt.setInt(1, -1)
+        qe_point_select stmt
+        stmt.setInt(1, 11)
+        qe_point_select stmt
+        stmt.setInt(1, 12)
+        qe_point_select stmt
+        stmt.setInt(1, 34)
+        qe_point_select stmt
+        stmt.setInt(1, 33)
+        qe_point_select stmt
+        stmt.setInt(1, 45)
+        qe_point_select stmt
+        stmt.setInt(1, 666)
+        qe_point_select stmt
+        stmt.setInt(1, 999)
+        qe_point_select stmt
+        stmt.setInt(1, 1000)
+        qe_point_select stmt
+    }
+} 
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to