This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 70c7e3d7aa [feature-wip](unique-key-merge-on-write) remove AggType on 
unique table with MoW, enable preAggreation, DSIP-018[5/2] (#11205)
70c7e3d7aa is described below

commit 70c7e3d7aa30cfdbddbc6ba1725c22f270343c9b
Author: zhannngchen <[email protected]>
AuthorDate: Thu Jul 28 17:03:05 2022 +0800

    [feature-wip](unique-key-merge-on-write) remove AggType on unique table 
with MoW, enable preAggreation, DSIP-018[5/2] (#11205)
    
     remove AggType on unique table with MoW, enable preAggreation
---
 be/src/olap/delta_writer.cpp                       |  5 +-
 be/src/olap/memtable.cpp                           | 87 +++++++++++++++-------
 be/src/olap/memtable.h                             | 14 ++--
 .../org/apache/doris/analysis/CreateTableStmt.java | 19 ++++-
 .../doris/datasource/InternalDataSource.java       |  5 ++
 .../doris/planner/MaterializedViewSelector.java    | 22 ++++--
 .../org/apache/doris/planner/OlapScanNode.java     |  8 +-
 .../apache/doris/analysis/CreateTableStmtTest.java | 46 ++++++++++++
 .../planner/MaterializedViewSelectorTest.java      |  8 +-
 9 files changed, 163 insertions(+), 51 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 04b7327f25..e50a8567b4 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -278,9 +278,8 @@ Status DeltaWriter::wait_flush() {
 }
 
 void DeltaWriter::_reset_mem_table() {
-    _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), 
_tablet_schema.get(),
-                                  _req.slots, _req.tuple_desc, 
_tablet->keys_type(),
-                                  _rowset_writer.get(), _is_vec));
+    _mem_table.reset(new MemTable(_tablet, _schema.get(), 
_tablet_schema.get(), _req.slots,
+                                  _req.tuple_desc, _rowset_writer.get(), 
_is_vec));
 }
 
 Status DeltaWriter::close() {
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 46607f6a6c..a1ade071d0 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -29,16 +29,15 @@
 
 namespace doris {
 
-MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* 
tablet_schema,
+MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* 
tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
-                   KeysType keys_type, RowsetWriter* rowset_writer, bool 
support_vec)
-        : _tablet_id(tablet_id),
+                   RowsetWriter* rowset_writer, bool support_vec)
+        : _tablet(std::move(tablet)),
           _schema(schema),
           _tablet_schema(tablet_schema),
           _slot_descs(slot_descs),
-          _keys_type(keys_type),
           _mem_tracker(std::make_unique<MemTracker>(
-                  fmt::format("MemTable:tabletId={}", 
std::to_string(tablet_id)))),
+                  fmt::format("MemTable:tabletId={}", 
std::to_string(tablet_id())))),
           _buffer_mem_pool(new MemPool(_mem_tracker.get())),
           _table_mem_pool(new MemPool(_mem_tracker.get())),
           _schema_size(_schema->schema_size()),
@@ -52,18 +51,19 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const 
TabletSchema* tablet
         _skip_list = nullptr;
         _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
         // TODO: Support ZOrderComparator in the future
-        _vec_skip_list = std::make_unique<VecTable>(
-                _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type 
== KeysType::DUP_KEYS);
+        _vec_skip_list =
+                std::make_unique<VecTable>(_vec_row_comparator.get(), 
_table_mem_pool.get(),
+                                           keys_type() == KeysType::DUP_KEYS);
         _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     } else {
         _vec_skip_list = nullptr;
-        if (_keys_type == KeysType::DUP_KEYS) {
+        if (keys_type() == KeysType::DUP_KEYS) {
             _insert_fn = &MemTable::_insert_dup;
         } else {
             _insert_fn = &MemTable::_insert_agg;
         }
-        if (_tablet_schema->has_sequence_col()) {
-            _aggregate_two_row_fn = 
&MemTable::_aggregate_two_row_with_sequence;
+        if (keys_type() == KeysType::UNIQUE_KEYS && 
_tablet->enable_unique_key_merge_on_write()) {
+            _aggregate_two_row_fn = &MemTable::_replace_row;
         } else {
             _aggregate_two_row_fn = &MemTable::_aggregate_two_row;
         }
@@ -74,7 +74,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const 
TabletSchema* tablet
             _row_comparator = std::make_shared<RowCursorComparator>(_schema);
         }
         _skip_list = std::make_unique<Table>(_row_comparator.get(), 
_table_mem_pool.get(),
-                                             _keys_type == KeysType::DUP_KEYS);
+                                             keys_type() == 
KeysType::DUP_KEYS);
     }
 }
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -92,9 +92,18 @@ void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescript
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
     for (uint32_t cid = _schema->num_key_columns(); cid < 
_schema->num_columns(); ++cid) {
-        vectorized::AggregateFunctionPtr function =
-                
_tablet_schema->column(cid).get_aggregate_function({block->get_data_type(cid)},
-                                                                   
vectorized::AGG_LOAD_SUFFIX);
+        vectorized::AggregateFunctionPtr function;
+        if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+            _tablet->enable_unique_key_merge_on_write()) {
+            // In such table, non-key column's aggregation type is NONE, so we 
need to construct
+            // the aggregate function manually.
+            function = 
vectorized::AggregateFunctionSimpleFactory::instance().get(
+                    "replace_load", {block->get_data_type(cid)}, {},
+                    block->get_data_type(cid)->is_nullable());
+        } else {
+            function = _tablet_schema->column(cid).get_aggregate_function(
+                    {block->get_data_type(cid)}, vectorized::AGG_LOAD_SUFFIX);
+        }
 
         DCHECK(function != nullptr);
         _agg_functions[cid] = function;
@@ -118,7 +127,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* 
block) {
 }
 
 MemTable::~MemTable() {
-    if (_vec_skip_list != nullptr && _keys_type != KeysType::DUP_KEYS) {
+    if (_vec_skip_list != nullptr && keys_type() != KeysType::DUP_KEYS) {
         VecTable::Iterator it(_vec_skip_list.get());
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
             // We should release agg_places here, because they are not relesed 
when a
@@ -160,7 +169,7 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<in
         _input_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
         _vec_row_comparator->set_block(&_input_mutable_block);
         _output_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        if (_keys_type != KeysType::DUP_KEYS) {
+        if (keys_type() != KeysType::DUP_KEYS) {
             _init_agg_functions(&target_block);
         }
     }
@@ -180,7 +189,7 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<in
 void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
     _rows++;
     bool overwritten = false;
-    if (_keys_type == KeysType::DUP_KEYS) {
+    if (keys_type() == KeysType::DUP_KEYS) {
         // TODO: dup keys only need sort opertaion. Rethink skiplist is the 
beat way to sort columns?
         _vec_skip_list->Insert(row_in_block, &overwritten);
         DCHECK(!overwritten) << "Duplicate key model meet overwrite in 
SkipList";
@@ -255,14 +264,38 @@ void MemTable::_tuple_to_row(const Tuple* tuple, 
ContiguousRow* row, MemPool* me
 
 void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey 
row_in_skiplist) {
     ContiguousRow dst_row(_schema, row_in_skiplist);
+    if (_tablet_schema->has_sequence_col()) {
+        return agg_update_row_with_sequence(&dst_row, src_row, 
_tablet_schema->sequence_col_idx(),
+                                            _table_mem_pool.get());
+    }
     agg_update_row(&dst_row, src_row, _table_mem_pool.get());
 }
 
-void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
-                                                TableKey row_in_skiplist) {
+// In the Unique Key table with primary key index, the non-key column's 
aggregation
+// type is NONE, to replace the data in duplicate row, we should copy the data 
manually.
+void MemTable::_replace_row(const ContiguousRow& src_row, TableKey 
row_in_skiplist) {
     ContiguousRow dst_row(_schema, row_in_skiplist);
-    agg_update_row_with_sequence(&dst_row, src_row, 
_tablet_schema->sequence_col_idx(),
-                                 _table_mem_pool.get());
+    if (_tablet_schema->has_sequence_col()) {
+        const int32_t sequence_idx = _tablet_schema->sequence_col_idx();
+        auto seq_dst_cell = dst_row.cell(sequence_idx);
+        auto seq_src_cell = src_row.cell(sequence_idx);
+        auto res = _schema->column(sequence_idx)->compare_cell(seq_dst_cell, 
seq_src_cell);
+        // dst sequence column larger than src, don't need to replace
+        if (res > 0) {
+            return;
+        }
+    }
+    // do replace
+    for (uint32_t cid = dst_row.schema()->num_key_columns(); cid < 
dst_row.schema()->num_columns();
+         ++cid) {
+        auto dst_cell = dst_row.cell(cid);
+        auto src_cell = src_row.cell(cid);
+        auto column = _schema->column(cid);
+        // Dest cell already allocated memory, use dirct_copy rather than 
deep_copy(which will
+        // allocate memory for dst_cell). If dst_cell's size is smaller than 
src_cell, direct_copy
+        // will reallocate the memory to fit the src_cell's data.
+        column->direct_copy(&dst_cell, src_cell);
+    }
 }
 
 void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* 
row_in_skiplist) {
@@ -287,7 +320,7 @@ template <bool is_final>
 void MemTable::_collect_vskiplist_results() {
     VecTable::Iterator it(_vec_skip_list.get());
     vectorized::Block in_block = _input_mutable_block.to_block();
-    if (_keys_type == KeysType::DUP_KEYS) {
+    if (keys_type() == KeysType::DUP_KEYS) {
         std::vector<int> row_pos_vec;
         DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
         row_pos_vec.reserve(in_block.rows());
@@ -347,7 +380,7 @@ void MemTable::_collect_vskiplist_results() {
 }
 
 void MemTable::shrink_memtable_by_agg() {
-    if (_keys_type == KeysType::DUP_KEYS) {
+    if (keys_type() == KeysType::DUP_KEYS) {
         return;
     }
     _collect_vskiplist_results<false>();
@@ -358,18 +391,18 @@ bool MemTable::is_flush() const {
 }
 
 bool MemTable::need_to_agg() {
-    return _keys_type == KeysType::DUP_KEYS ? is_flush()
-                                            : memory_usage() >= 
config::memtable_max_buffer_size;
+    return keys_type() == KeysType::DUP_KEYS ? is_flush()
+                                             : memory_usage() >= 
config::memtable_max_buffer_size;
 }
 
 Status MemTable::flush() {
-    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
+    VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
     RETURN_NOT_OK(_do_flush(duration_ns));
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
-    VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
+    VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id()
                   << ", flushsize: " << _flush_size;
     return Status::OK();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9be1f7d33d..0a18f51753 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -22,6 +22,7 @@
 #include "common/object_pool.h"
 #include "olap/olap_define.h"
 #include "olap/skiplist.h"
+#include "olap/tablet.h"
 #include "runtime/memory/mem_tracker.h"
 #include "util/tuple_row_zorder_compare.h"
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -40,12 +41,13 @@ class TupleDescriptor;
 
 class MemTable {
 public:
-    MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* 
tablet_schema,
+    MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* 
tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
-             KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec 
= false);
+             RowsetWriter* rowset_writer, bool support_vec = false);
     ~MemTable();
 
-    int64_t tablet_id() const { return _tablet_id; }
+    int64_t tablet_id() const { return _tablet->tablet_id(); }
+    KeysType keys_type() const { return _tablet->keys_type(); }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
 
     inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
@@ -132,19 +134,19 @@ public:
 private:
     void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* 
mem_pool);
     void _aggregate_two_row(const ContiguousRow& new_row, TableKey 
row_in_skiplist);
-    void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, 
TableKey row_in_skiplist);
+    void _replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist);
     void _insert_dup(const Tuple* tuple);
     void _insert_agg(const Tuple* tuple);
     // for vectorized
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* 
row_in_skiplist);
 
-    int64_t _tablet_id;
+private:
+    TabletSharedPtr _tablet;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
     // the slot in _slot_descs are in order of tablet's schema
     const std::vector<SlotDescriptor*>* _slot_descs;
-    KeysType _keys_type;
 
     // TODO: change to unique_ptr of comparator
     std::shared_ptr<RowComparator> _row_comparator;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 27546c6ec9..de10c63b47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.external.elasticsearch.EsUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -47,6 +48,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -276,6 +278,14 @@ public class CreateTableStmt extends DdlStmt {
         if (engineName.equals("hive") && !Config.enable_spark_load) {
             throw new AnalysisException("Spark Load from hive table is coming 
soon");
         }
+
+        // `analyzeUniqueKeyMergeOnWrite` would modify `properties`, which 
will be used later,
+        // so we just clone a properties map here.
+        boolean enableUniqueKeyMergeOnWrite = false;
+        if (properties != null) {
+            enableUniqueKeyMergeOnWrite = 
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(new HashMap<>(properties));
+        }
+
         // analyze key desc
         if (engineName.equalsIgnoreCase("olap")) {
             // olap table
@@ -339,6 +349,9 @@ public class CreateTableStmt extends DdlStmt {
                 if (keysDesc.getKeysType() == KeysType.DUP_KEYS) {
                     type = AggregateType.NONE;
                 }
+                if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS && 
enableUniqueKeyMergeOnWrite) {
+                    type = AggregateType.NONE;
+                }
                 for (int i = keysDesc.keysColumnSize(); i < columnDefs.size(); 
++i) {
                     columnDefs.get(i).setAggregateType(type);
                 }
@@ -363,7 +376,11 @@ public class CreateTableStmt extends DdlStmt {
         if (Config.enable_batch_delete_by_default
                 && keysDesc != null
                 && keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
-            
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
+            // TODO(zhangchen): Disable the delete sign column for primary key 
temporary, will replace
+            // with a better solution later.
+            if (!enableUniqueKeyMergeOnWrite) {
+                
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
+            }
         }
         boolean hasObjectStored = false;
         String objectStoredColumn = "";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 6796ba922b..cb484d2820 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -1890,6 +1890,11 @@ public class InternalDataSource implements 
DataSourceIf<Database> {
         try {
             sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, 
olapTable.getKeysType());
             if (sequenceColType != null) {
+                // TODO(zhannngchen) will support sequence column later.
+                if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
+                    throw new AnalysisException("Unique key table with 
MoW(merge on write) not support "
+                        + "sequence column for now");
+                }
                 olapTable.setSequenceInfo(sequenceColType);
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index aa613540e3..dbdcbfb765 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -140,13 +140,14 @@ public class MaterializedViewSelector {
         // Step2: check all columns in compensating predicates are available 
in the view output
         checkCompensatingPredicates(columnNamesInPredicates.get(tableId), 
candidateIndexIdToMeta);
         // Step3: group by list in query is the subset of group by list in 
view or view contains no aggregation
-        checkGrouping(columnNamesInGrouping.get(tableId), 
candidateIndexIdToMeta);
+        checkGrouping(table, columnNamesInGrouping.get(tableId), 
candidateIndexIdToMeta);
         // Step4: aggregation functions are available in the view output
-        checkAggregationFunction(aggColumnsInQuery.get(tableId), 
candidateIndexIdToMeta);
+        checkAggregationFunction(table, aggColumnsInQuery.get(tableId), 
candidateIndexIdToMeta);
         // Step5: columns required to compute output expr are available in the 
view output
         checkOutputColumns(columnNamesInQueryOutput.get(tableId), 
candidateIndexIdToMeta);
         // Step6: if table type is aggregate and the candidateIndexIdToSchema 
is empty,
-        if ((table.getKeysType() == KeysType.AGG_KEYS || table.getKeysType() 
== KeysType.UNIQUE_KEYS)
+        if ((table.getKeysType() == KeysType.AGG_KEYS || (table.getKeysType() 
== KeysType.UNIQUE_KEYS
+                && !table.getTableProperty().getEnableUniqueKeyMergeOnWrite()))
                 && candidateIndexIdToMeta.size() == 0) {
             // the base index will be added in the candidateIndexIdToSchema.
             /**
@@ -299,7 +300,7 @@ public class MaterializedViewSelector {
      * @param candidateIndexIdToMeta
      */
 
-    private void checkGrouping(Set<String> columnsInGrouping, Map<Long, 
MaterializedIndexMeta>
+    private void checkGrouping(OlapTable table, Set<String> columnsInGrouping, 
Map<Long, MaterializedIndexMeta>
             candidateIndexIdToMeta) {
         Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator = 
candidateIndexIdToMeta.entrySet().iterator();
         while (iterator.hasNext()) {
@@ -325,8 +326,10 @@ public class MaterializedViewSelector {
 
             ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg
              */
-            if (indexNonAggregatedColumnNames.size() == 
candidateIndexSchema.size()
-                    && candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
+            boolean noNeedAggregation = candidateIndexMeta.getKeysType() == 
KeysType.DUP_KEYS
+                    || (candidateIndexMeta.getKeysType() == 
KeysType.UNIQUE_KEYS
+                    && 
table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
+            if (indexNonAggregatedColumnNames.size() == 
candidateIndexSchema.size() && noNeedAggregation) {
                 continue;
             }
             // When the query is SPJ type but the candidate index is SPJG 
type, it will not pass directly.
@@ -348,7 +351,7 @@ public class MaterializedViewSelector {
                           + 
Joiner.on(",").join(candidateIndexIdToMeta.keySet()));
     }
 
-    private void checkAggregationFunction(Set<FunctionCallExpr> 
aggregatedColumnsInQueryOutput,
+    private void checkAggregationFunction(OlapTable table, 
Set<FunctionCallExpr> aggregatedColumnsInQueryOutput,
             Map<Long, MaterializedIndexMeta> candidateIndexIdToMeta) throws 
AnalysisException {
         Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator = 
candidateIndexIdToMeta.entrySet().iterator();
         while (iterator.hasNext()) {
@@ -356,7 +359,10 @@ public class MaterializedViewSelector {
             MaterializedIndexMeta candidateIndexMeta = entry.getValue();
             List<FunctionCallExpr> indexAggColumnExpsList = 
mvAggColumnsToExprList(candidateIndexMeta);
             // When the candidate index is SPJ type, it passes the 
verification directly
-            if (indexAggColumnExpsList.size() == 0 && 
candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
+            boolean noNeedAggregation = candidateIndexMeta.getKeysType() == 
KeysType.DUP_KEYS
+                    || (candidateIndexMeta.getKeysType() == 
KeysType.UNIQUE_KEYS
+                    && 
table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
+            if (indexAggColumnExpsList.size() == 0 && noNeedAggregation) {
                 continue;
             }
             // When the query is SPJ type but the candidate index is SPJG 
type, it will not pass directly.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2d4e9399e3..787d8a771f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -275,8 +275,9 @@ public class OlapScanNode extends ScanNode {
         String situation;
         boolean update;
         CHECK: { // CHECKSTYLE IGNORE THIS LINE
-            if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
-                situation = "The key type of table is duplicate.";
+            if (olapTable.getKeysType() == KeysType.DUP_KEYS || 
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+                    && olapTable.getEnableUniqueKeyMergeOnWrite())) {
+                situation = "The key type of table is duplicate, or unique key 
with merge-on-write.";
                 update = true;
                 break CHECK;
             }
@@ -659,7 +660,8 @@ public class OlapScanNode extends ScanNode {
     public void selectBestRollupByRollupSelector(Analyzer analyzer) throws 
UserException {
         // Step2: select best rollup
         long start = System.currentTimeMillis();
-        if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
+        if (olapTable.getKeysType() == KeysType.DUP_KEYS || 
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+                && olapTable.getEnableUniqueKeyMergeOnWrite())) {
             // This function is compatible with the INDEX selection logic of 
ROLLUP,
             // so the Duplicate table here returns base index directly
             // and the selection logic of materialized view is selected in
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
index 5371482345..44e62d2dc5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalDataSource;
 import org.apache.doris.mysql.privilege.MockedAuth;
 import org.apache.doris.mysql.privilege.PaloAuth;
@@ -122,6 +123,51 @@ public class CreateTableStmtTest {
         Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY 
RANDOM\nBUCKETS 6"));
     }
 
+    @Test
+    public void testCreateTableUniqueKeyNormal() throws UserException {
+        // setup
+        Map<String, String> properties = new HashMap<>();
+        ColumnDef col3 = new ColumnDef("col3", new 
TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
+        col3.setIsKey(false);
+        cols.add(col3);
+        ColumnDef col4 = new ColumnDef("col4", new 
TypeDef(ScalarType.createType(PrimitiveType.STRING)));
+        col4.setIsKey(false);
+        cols.add(col4);
+        // test normal case
+        CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, 
cols, "olap",
+                new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
+                new HashDistributionDesc(10, Lists.newArrayList("col1")), 
properties, null, "");
+        stmt.analyze(analyzer);
+        Assert.assertEquals(col3.getAggregateType(), AggregateType.REPLACE);
+        Assert.assertEquals(col4.getAggregateType(), AggregateType.REPLACE);
+        // clear
+        cols.remove(col3);
+        cols.remove(col4);
+    }
+
+    @Test
+    public void testCreateTableUniqueKeyMoW() throws UserException {
+        // setup
+        Map<String, String> properties = new HashMap<>();
+        properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, 
"true");
+        ColumnDef col3 = new ColumnDef("col3", new 
TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
+        col3.setIsKey(false);
+        cols.add(col3);
+        ColumnDef col4 = new ColumnDef("col4", new 
TypeDef(ScalarType.createType(PrimitiveType.STRING)));
+        col4.setIsKey(false);
+        cols.add(col4);
+        // test merge-on-write
+        CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, 
cols, "olap",
+                new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
+                new HashDistributionDesc(10, Lists.newArrayList("col1")), 
properties, null, "");
+        stmt.analyze(analyzer);
+        Assert.assertEquals(col3.getAggregateType(), AggregateType.NONE);
+        Assert.assertEquals(col4.getAggregateType(), AggregateType.NONE);
+        // clear
+        cols.remove(col3);
+        cols.remove(col4);
+    }
+
     @Test
     public void testCreateTableWithRollup() throws UserException {
         List<AlterClause> ops = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
index 6f8d711c0e..820c90d4c9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
@@ -210,6 +210,7 @@ public class MaterializedViewSelectorTest {
 
     @Test
     public void testCheckGrouping(@Injectable SelectStmt selectStmt, 
@Injectable Analyzer analyzer,
+            @Injectable OlapTable table,
             @Injectable MaterializedIndexMeta indexMeta1,
             @Injectable MaterializedIndexMeta indexMeta2,
             @Injectable MaterializedIndexMeta indexMeta3) {
@@ -249,7 +250,7 @@ public class MaterializedViewSelectorTest {
 
         MaterializedViewSelector selector = new 
MaterializedViewSelector(selectStmt, analyzer);
         Deencapsulation.setField(selector, "isSPJQuery", false);
-        Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, 
candidateIndexIdToSchema);
+        Deencapsulation.invoke(selector, "checkGrouping", table, 
tableAColumnNames, candidateIndexIdToSchema);
         Assert.assertEquals(2, candidateIndexIdToSchema.size());
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new 
Long(1)));
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new 
Long(2)));
@@ -257,6 +258,7 @@ public class MaterializedViewSelectorTest {
 
     @Test
     public void testCheckAggregationFunction(@Injectable SelectStmt 
selectStmt, @Injectable Analyzer analyzer,
+            @Injectable OlapTable table,
             @Injectable MaterializedIndexMeta indexMeta1,
             @Injectable MaterializedIndexMeta indexMeta2,
             @Injectable MaterializedIndexMeta indexMeta3) {
@@ -299,8 +301,8 @@ public class MaterializedViewSelectorTest {
         Set<FunctionCallExpr> aggregatedColumnsInQueryOutput = 
Sets.newHashSet();
         aggregatedColumnsInQueryOutput.add(functionCallExpr);
         Deencapsulation.setField(selector, "isSPJQuery", false);
-        Deencapsulation.invoke(selector, "checkAggregationFunction", 
aggregatedColumnsInQueryOutput,
-                               candidateIndexIdToSchema);
+        Deencapsulation.invoke(selector, "checkAggregationFunction", table, 
aggregatedColumnsInQueryOutput,
+                candidateIndexIdToSchema);
         Assert.assertEquals(2, candidateIndexIdToSchema.size());
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new 
Long(1)));
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new 
Long(3)));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to