This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 70c7e3d7aa [feature-wip](unique-key-merge-on-write) remove AggType on
unique table with MoW, enable preAggreation, DSIP-018[5/2] (#11205)
70c7e3d7aa is described below
commit 70c7e3d7aa30cfdbddbc6ba1725c22f270343c9b
Author: zhannngchen <[email protected]>
AuthorDate: Thu Jul 28 17:03:05 2022 +0800
[feature-wip](unique-key-merge-on-write) remove AggType on unique table
with MoW, enable preAggreation, DSIP-018[5/2] (#11205)
remove AggType on unique table with MoW, enable preAggreation
---
be/src/olap/delta_writer.cpp | 5 +-
be/src/olap/memtable.cpp | 87 +++++++++++++++-------
be/src/olap/memtable.h | 14 ++--
.../org/apache/doris/analysis/CreateTableStmt.java | 19 ++++-
.../doris/datasource/InternalDataSource.java | 5 ++
.../doris/planner/MaterializedViewSelector.java | 22 ++++--
.../org/apache/doris/planner/OlapScanNode.java | 8 +-
.../apache/doris/analysis/CreateTableStmtTest.java | 46 ++++++++++++
.../planner/MaterializedViewSelectorTest.java | 8 +-
9 files changed, 163 insertions(+), 51 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 04b7327f25..e50a8567b4 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -278,9 +278,8 @@ Status DeltaWriter::wait_flush() {
}
void DeltaWriter::_reset_mem_table() {
- _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(),
_tablet_schema.get(),
- _req.slots, _req.tuple_desc,
_tablet->keys_type(),
- _rowset_writer.get(), _is_vec));
+ _mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
+ _req.tuple_desc, _rowset_writer.get(),
_is_vec));
}
Status DeltaWriter::close() {
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 46607f6a6c..a1ade071d0 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -29,16 +29,15 @@
namespace doris {
-MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema*
tablet_schema,
+MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer, bool
support_vec)
- : _tablet_id(tablet_id),
+ RowsetWriter* rowset_writer, bool support_vec)
+ : _tablet(std::move(tablet)),
_schema(schema),
_tablet_schema(tablet_schema),
_slot_descs(slot_descs),
- _keys_type(keys_type),
_mem_tracker(std::make_unique<MemTracker>(
- fmt::format("MemTable:tabletId={}",
std::to_string(tablet_id)))),
+ fmt::format("MemTable:tabletId={}",
std::to_string(tablet_id())))),
_buffer_mem_pool(new MemPool(_mem_tracker.get())),
_table_mem_pool(new MemPool(_mem_tracker.get())),
_schema_size(_schema->schema_size()),
@@ -52,18 +51,19 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const
TabletSchema* tablet
_skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
// TODO: Support ZOrderComparator in the future
- _vec_skip_list = std::make_unique<VecTable>(
- _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type
== KeysType::DUP_KEYS);
+ _vec_skip_list =
+ std::make_unique<VecTable>(_vec_row_comparator.get(),
_table_mem_pool.get(),
+ keys_type() == KeysType::DUP_KEYS);
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
} else {
_vec_skip_list = nullptr;
- if (_keys_type == KeysType::DUP_KEYS) {
+ if (keys_type() == KeysType::DUP_KEYS) {
_insert_fn = &MemTable::_insert_dup;
} else {
_insert_fn = &MemTable::_insert_agg;
}
- if (_tablet_schema->has_sequence_col()) {
- _aggregate_two_row_fn =
&MemTable::_aggregate_two_row_with_sequence;
+ if (keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
+ _aggregate_two_row_fn = &MemTable::_replace_row;
} else {
_aggregate_two_row_fn = &MemTable::_aggregate_two_row;
}
@@ -74,7 +74,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const
TabletSchema* tablet
_row_comparator = std::make_shared<RowCursorComparator>(_schema);
}
_skip_list = std::make_unique<Table>(_row_comparator.get(),
_table_mem_pool.get(),
- _keys_type == KeysType::DUP_KEYS);
+ keys_type() ==
KeysType::DUP_KEYS);
}
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -92,9 +92,18 @@ void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescript
void MemTable::_init_agg_functions(const vectorized::Block* block) {
for (uint32_t cid = _schema->num_key_columns(); cid <
_schema->num_columns(); ++cid) {
- vectorized::AggregateFunctionPtr function =
-
_tablet_schema->column(cid).get_aggregate_function({block->get_data_type(cid)},
-
vectorized::AGG_LOAD_SUFFIX);
+ vectorized::AggregateFunctionPtr function;
+ if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ _tablet->enable_unique_key_merge_on_write()) {
+ // In such table, non-key column's aggregation type is NONE, so we
need to construct
+ // the aggregate function manually.
+ function =
vectorized::AggregateFunctionSimpleFactory::instance().get(
+ "replace_load", {block->get_data_type(cid)}, {},
+ block->get_data_type(cid)->is_nullable());
+ } else {
+ function = _tablet_schema->column(cid).get_aggregate_function(
+ {block->get_data_type(cid)}, vectorized::AGG_LOAD_SUFFIX);
+ }
DCHECK(function != nullptr);
_agg_functions[cid] = function;
@@ -118,7 +127,7 @@ void MemTable::_init_agg_functions(const vectorized::Block*
block) {
}
MemTable::~MemTable() {
- if (_vec_skip_list != nullptr && _keys_type != KeysType::DUP_KEYS) {
+ if (_vec_skip_list != nullptr && keys_type() != KeysType::DUP_KEYS) {
VecTable::Iterator it(_vec_skip_list.get());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
// We should release agg_places here, because they are not relesed
when a
@@ -160,7 +169,7 @@ void MemTable::insert(const vectorized::Block* input_block,
const std::vector<in
_input_mutable_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
_vec_row_comparator->set_block(&_input_mutable_block);
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
- if (_keys_type != KeysType::DUP_KEYS) {
+ if (keys_type() != KeysType::DUP_KEYS) {
_init_agg_functions(&target_block);
}
}
@@ -180,7 +189,7 @@ void MemTable::insert(const vectorized::Block* input_block,
const std::vector<in
void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
_rows++;
bool overwritten = false;
- if (_keys_type == KeysType::DUP_KEYS) {
+ if (keys_type() == KeysType::DUP_KEYS) {
// TODO: dup keys only need sort opertaion. Rethink skiplist is the
beat way to sort columns?
_vec_skip_list->Insert(row_in_block, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in
SkipList";
@@ -255,14 +264,38 @@ void MemTable::_tuple_to_row(const Tuple* tuple,
ContiguousRow* row, MemPool* me
void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey
row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
+ if (_tablet_schema->has_sequence_col()) {
+ return agg_update_row_with_sequence(&dst_row, src_row,
_tablet_schema->sequence_col_idx(),
+ _table_mem_pool.get());
+ }
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}
-void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
- TableKey row_in_skiplist) {
+// In the Unique Key table with primary key index, the non-key column's
aggregation
+// type is NONE, to replace the data in duplicate row, we should copy the data
manually.
+void MemTable::_replace_row(const ContiguousRow& src_row, TableKey
row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
- agg_update_row_with_sequence(&dst_row, src_row,
_tablet_schema->sequence_col_idx(),
- _table_mem_pool.get());
+ if (_tablet_schema->has_sequence_col()) {
+ const int32_t sequence_idx = _tablet_schema->sequence_col_idx();
+ auto seq_dst_cell = dst_row.cell(sequence_idx);
+ auto seq_src_cell = src_row.cell(sequence_idx);
+ auto res = _schema->column(sequence_idx)->compare_cell(seq_dst_cell,
seq_src_cell);
+ // dst sequence column larger than src, don't need to replace
+ if (res > 0) {
+ return;
+ }
+ }
+ // do replace
+ for (uint32_t cid = dst_row.schema()->num_key_columns(); cid <
dst_row.schema()->num_columns();
+ ++cid) {
+ auto dst_cell = dst_row.cell(cid);
+ auto src_cell = src_row.cell(cid);
+ auto column = _schema->column(cid);
+ // Dest cell already allocated memory, use dirct_copy rather than
deep_copy(which will
+ // allocate memory for dst_cell). If dst_cell's size is smaller than
src_cell, direct_copy
+ // will reallocate the memory to fit the src_cell's data.
+ column->direct_copy(&dst_cell, src_cell);
+ }
}
void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock*
row_in_skiplist) {
@@ -287,7 +320,7 @@ template <bool is_final>
void MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
- if (_keys_type == KeysType::DUP_KEYS) {
+ if (keys_type() == KeysType::DUP_KEYS) {
std::vector<int> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
@@ -347,7 +380,7 @@ void MemTable::_collect_vskiplist_results() {
}
void MemTable::shrink_memtable_by_agg() {
- if (_keys_type == KeysType::DUP_KEYS) {
+ if (keys_type() == KeysType::DUP_KEYS) {
return;
}
_collect_vskiplist_results<false>();
@@ -358,18 +391,18 @@ bool MemTable::is_flush() const {
}
bool MemTable::need_to_agg() {
- return _keys_type == KeysType::DUP_KEYS ? is_flush()
- : memory_usage() >=
config::memtable_max_buffer_size;
+ return keys_type() == KeysType::DUP_KEYS ? is_flush()
+ : memory_usage() >=
config::memtable_max_buffer_size;
}
Status MemTable::flush() {
- VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
+ VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
<< ", memsize: " << memory_usage() << ", rows: " << _rows;
int64_t duration_ns = 0;
RETURN_NOT_OK(_do_flush(duration_ns));
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
- VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
+ VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id()
<< ", flushsize: " << _flush_size;
return Status::OK();
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9be1f7d33d..0a18f51753 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -22,6 +22,7 @@
#include "common/object_pool.h"
#include "olap/olap_define.h"
#include "olap/skiplist.h"
+#include "olap/tablet.h"
#include "runtime/memory/mem_tracker.h"
#include "util/tuple_row_zorder_compare.h"
#include "vec/aggregate_functions/aggregate_function.h"
@@ -40,12 +41,13 @@ class TupleDescriptor;
class MemTable {
public:
- MemTable(int64_t tablet_id, Schema* schema, const TabletSchema*
tablet_schema,
+ MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec
= false);
+ RowsetWriter* rowset_writer, bool support_vec = false);
~MemTable();
- int64_t tablet_id() const { return _tablet_id; }
+ int64_t tablet_id() const { return _tablet->tablet_id(); }
+ KeysType keys_type() const { return _tablet->keys_type(); }
size_t memory_usage() const { return _mem_tracker->consumption(); }
inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
@@ -132,19 +134,19 @@ public:
private:
void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool*
mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey
row_in_skiplist);
- void _aggregate_two_row_with_sequence(const ContiguousRow& new_row,
TableKey row_in_skiplist);
+ void _replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist);
void _insert_dup(const Tuple* tuple);
void _insert_agg(const Tuple* tuple);
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock*
row_in_skiplist);
- int64_t _tablet_id;
+private:
+ TabletSharedPtr _tablet;
Schema* _schema;
const TabletSchema* _tablet_schema;
// the slot in _slot_descs are in order of tablet's schema
const std::vector<SlotDescriptor*>* _slot_descs;
- KeysType _keys_type;
// TODO: change to unique_ptr of comparator
std::shared_ptr<RowComparator> _row_comparator;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 27546c6ec9..de10c63b47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -47,6 +48,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -276,6 +278,14 @@ public class CreateTableStmt extends DdlStmt {
if (engineName.equals("hive") && !Config.enable_spark_load) {
throw new AnalysisException("Spark Load from hive table is coming
soon");
}
+
+ // `analyzeUniqueKeyMergeOnWrite` would modify `properties`, which
will be used later,
+ // so we just clone a properties map here.
+ boolean enableUniqueKeyMergeOnWrite = false;
+ if (properties != null) {
+ enableUniqueKeyMergeOnWrite =
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(new HashMap<>(properties));
+ }
+
// analyze key desc
if (engineName.equalsIgnoreCase("olap")) {
// olap table
@@ -339,6 +349,9 @@ public class CreateTableStmt extends DdlStmt {
if (keysDesc.getKeysType() == KeysType.DUP_KEYS) {
type = AggregateType.NONE;
}
+ if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS &&
enableUniqueKeyMergeOnWrite) {
+ type = AggregateType.NONE;
+ }
for (int i = keysDesc.keysColumnSize(); i < columnDefs.size();
++i) {
columnDefs.get(i).setAggregateType(type);
}
@@ -363,7 +376,11 @@ public class CreateTableStmt extends DdlStmt {
if (Config.enable_batch_delete_by_default
&& keysDesc != null
&& keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
-
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
+ // TODO(zhangchen): Disable the delete sign column for primary key
temporary, will replace
+ // with a better solution later.
+ if (!enableUniqueKeyMergeOnWrite) {
+
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
+ }
}
boolean hasObjectStored = false;
String objectStoredColumn = "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 6796ba922b..cb484d2820 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -1890,6 +1890,11 @@ public class InternalDataSource implements
DataSourceIf<Database> {
try {
sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties,
olapTable.getKeysType());
if (sequenceColType != null) {
+ // TODO(zhannngchen) will support sequence column later.
+ if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
+ throw new AnalysisException("Unique key table with
MoW(merge on write) not support "
+ + "sequence column for now");
+ }
olapTable.setSequenceInfo(sequenceColType);
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index aa613540e3..dbdcbfb765 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -140,13 +140,14 @@ public class MaterializedViewSelector {
// Step2: check all columns in compensating predicates are available
in the view output
checkCompensatingPredicates(columnNamesInPredicates.get(tableId),
candidateIndexIdToMeta);
// Step3: group by list in query is the subset of group by list in
view or view contains no aggregation
- checkGrouping(columnNamesInGrouping.get(tableId),
candidateIndexIdToMeta);
+ checkGrouping(table, columnNamesInGrouping.get(tableId),
candidateIndexIdToMeta);
// Step4: aggregation functions are available in the view output
- checkAggregationFunction(aggColumnsInQuery.get(tableId),
candidateIndexIdToMeta);
+ checkAggregationFunction(table, aggColumnsInQuery.get(tableId),
candidateIndexIdToMeta);
// Step5: columns required to compute output expr are available in the
view output
checkOutputColumns(columnNamesInQueryOutput.get(tableId),
candidateIndexIdToMeta);
// Step6: if table type is aggregate and the candidateIndexIdToSchema
is empty,
- if ((table.getKeysType() == KeysType.AGG_KEYS || table.getKeysType()
== KeysType.UNIQUE_KEYS)
+ if ((table.getKeysType() == KeysType.AGG_KEYS || (table.getKeysType()
== KeysType.UNIQUE_KEYS
+ && !table.getTableProperty().getEnableUniqueKeyMergeOnWrite()))
&& candidateIndexIdToMeta.size() == 0) {
// the base index will be added in the candidateIndexIdToSchema.
/**
@@ -299,7 +300,7 @@ public class MaterializedViewSelector {
* @param candidateIndexIdToMeta
*/
- private void checkGrouping(Set<String> columnsInGrouping, Map<Long,
MaterializedIndexMeta>
+ private void checkGrouping(OlapTable table, Set<String> columnsInGrouping,
Map<Long, MaterializedIndexMeta>
candidateIndexIdToMeta) {
Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator =
candidateIndexIdToMeta.entrySet().iterator();
while (iterator.hasNext()) {
@@ -325,8 +326,10 @@ public class MaterializedViewSelector {
ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg
*/
- if (indexNonAggregatedColumnNames.size() ==
candidateIndexSchema.size()
- && candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
+ boolean noNeedAggregation = candidateIndexMeta.getKeysType() ==
KeysType.DUP_KEYS
+ || (candidateIndexMeta.getKeysType() ==
KeysType.UNIQUE_KEYS
+ &&
table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
+ if (indexNonAggregatedColumnNames.size() ==
candidateIndexSchema.size() && noNeedAggregation) {
continue;
}
// When the query is SPJ type but the candidate index is SPJG
type, it will not pass directly.
@@ -348,7 +351,7 @@ public class MaterializedViewSelector {
+
Joiner.on(",").join(candidateIndexIdToMeta.keySet()));
}
- private void checkAggregationFunction(Set<FunctionCallExpr>
aggregatedColumnsInQueryOutput,
+ private void checkAggregationFunction(OlapTable table,
Set<FunctionCallExpr> aggregatedColumnsInQueryOutput,
Map<Long, MaterializedIndexMeta> candidateIndexIdToMeta) throws
AnalysisException {
Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator =
candidateIndexIdToMeta.entrySet().iterator();
while (iterator.hasNext()) {
@@ -356,7 +359,10 @@ public class MaterializedViewSelector {
MaterializedIndexMeta candidateIndexMeta = entry.getValue();
List<FunctionCallExpr> indexAggColumnExpsList =
mvAggColumnsToExprList(candidateIndexMeta);
// When the candidate index is SPJ type, it passes the
verification directly
- if (indexAggColumnExpsList.size() == 0 &&
candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
+ boolean noNeedAggregation = candidateIndexMeta.getKeysType() ==
KeysType.DUP_KEYS
+ || (candidateIndexMeta.getKeysType() ==
KeysType.UNIQUE_KEYS
+ &&
table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
+ if (indexAggColumnExpsList.size() == 0 && noNeedAggregation) {
continue;
}
// When the query is SPJ type but the candidate index is SPJG
type, it will not pass directly.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2d4e9399e3..787d8a771f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -275,8 +275,9 @@ public class OlapScanNode extends ScanNode {
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
- if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
- situation = "The key type of table is duplicate.";
+ if (olapTable.getKeysType() == KeysType.DUP_KEYS ||
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+ && olapTable.getEnableUniqueKeyMergeOnWrite())) {
+ situation = "The key type of table is duplicate, or unique key
with merge-on-write.";
update = true;
break CHECK;
}
@@ -659,7 +660,8 @@ public class OlapScanNode extends ScanNode {
public void selectBestRollupByRollupSelector(Analyzer analyzer) throws
UserException {
// Step2: select best rollup
long start = System.currentTimeMillis();
- if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
+ if (olapTable.getKeysType() == KeysType.DUP_KEYS ||
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+ && olapTable.getEnableUniqueKeyMergeOnWrite())) {
// This function is compatible with the INDEX selection logic of
ROLLUP,
// so the Duplicate table here returns base index directly
// and the selection logic of materialized view is selected in
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
index 5371482345..44e62d2dc5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.mysql.privilege.MockedAuth;
import org.apache.doris.mysql.privilege.PaloAuth;
@@ -122,6 +123,51 @@ public class CreateTableStmtTest {
Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY
RANDOM\nBUCKETS 6"));
}
+ @Test
+ public void testCreateTableUniqueKeyNormal() throws UserException {
+ // setup
+ Map<String, String> properties = new HashMap<>();
+ ColumnDef col3 = new ColumnDef("col3", new
TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
+ col3.setIsKey(false);
+ cols.add(col3);
+ ColumnDef col4 = new ColumnDef("col4", new
TypeDef(ScalarType.createType(PrimitiveType.STRING)));
+ col4.setIsKey(false);
+ cols.add(col4);
+ // test normal case
+ CreateTableStmt stmt = new CreateTableStmt(false, false, tblName,
cols, "olap",
+ new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
+ new HashDistributionDesc(10, Lists.newArrayList("col1")),
properties, null, "");
+ stmt.analyze(analyzer);
+ Assert.assertEquals(col3.getAggregateType(), AggregateType.REPLACE);
+ Assert.assertEquals(col4.getAggregateType(), AggregateType.REPLACE);
+ // clear
+ cols.remove(col3);
+ cols.remove(col4);
+ }
+
+ @Test
+ public void testCreateTableUniqueKeyMoW() throws UserException {
+ // setup
+ Map<String, String> properties = new HashMap<>();
+ properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE,
"true");
+ ColumnDef col3 = new ColumnDef("col3", new
TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
+ col3.setIsKey(false);
+ cols.add(col3);
+ ColumnDef col4 = new ColumnDef("col4", new
TypeDef(ScalarType.createType(PrimitiveType.STRING)));
+ col4.setIsKey(false);
+ cols.add(col4);
+ // test merge-on-write
+ CreateTableStmt stmt = new CreateTableStmt(false, false, tblName,
cols, "olap",
+ new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
+ new HashDistributionDesc(10, Lists.newArrayList("col1")),
properties, null, "");
+ stmt.analyze(analyzer);
+ Assert.assertEquals(col3.getAggregateType(), AggregateType.NONE);
+ Assert.assertEquals(col4.getAggregateType(), AggregateType.NONE);
+ // clear
+ cols.remove(col3);
+ cols.remove(col4);
+ }
+
@Test
public void testCreateTableWithRollup() throws UserException {
List<AlterClause> ops = Lists.newArrayList();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
index 6f8d711c0e..820c90d4c9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
@@ -210,6 +210,7 @@ public class MaterializedViewSelectorTest {
@Test
public void testCheckGrouping(@Injectable SelectStmt selectStmt,
@Injectable Analyzer analyzer,
+ @Injectable OlapTable table,
@Injectable MaterializedIndexMeta indexMeta1,
@Injectable MaterializedIndexMeta indexMeta2,
@Injectable MaterializedIndexMeta indexMeta3) {
@@ -249,7 +250,7 @@ public class MaterializedViewSelectorTest {
MaterializedViewSelector selector = new
MaterializedViewSelector(selectStmt, analyzer);
Deencapsulation.setField(selector, "isSPJQuery", false);
- Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames,
candidateIndexIdToSchema);
+ Deencapsulation.invoke(selector, "checkGrouping", table,
tableAColumnNames, candidateIndexIdToSchema);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(1)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(2)));
@@ -257,6 +258,7 @@ public class MaterializedViewSelectorTest {
@Test
public void testCheckAggregationFunction(@Injectable SelectStmt
selectStmt, @Injectable Analyzer analyzer,
+ @Injectable OlapTable table,
@Injectable MaterializedIndexMeta indexMeta1,
@Injectable MaterializedIndexMeta indexMeta2,
@Injectable MaterializedIndexMeta indexMeta3) {
@@ -299,8 +301,8 @@ public class MaterializedViewSelectorTest {
Set<FunctionCallExpr> aggregatedColumnsInQueryOutput =
Sets.newHashSet();
aggregatedColumnsInQueryOutput.add(functionCallExpr);
Deencapsulation.setField(selector, "isSPJQuery", false);
- Deencapsulation.invoke(selector, "checkAggregationFunction",
aggregatedColumnsInQueryOutput,
- candidateIndexIdToSchema);
+ Deencapsulation.invoke(selector, "checkAggregationFunction", table,
aggregatedColumnsInQueryOutput,
+ candidateIndexIdToSchema);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(1)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new
Long(3)));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]