dataroaring commented on code in PR #39756:
URL: https://github.com/apache/doris/pull/39756#discussion_r1775069398
##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_enable_profile(false);
}
}
- if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+ StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+ {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+ {"UPDATE_FIXED_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+ {"UPDATE_FLEXIBLE_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
Review Comment:
put inside if below.
##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_enable_profile(false);
}
}
- if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+ StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+ {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+ {"UPDATE_FIXED_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+ {"UPDATE_FLEXIBLE_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
+ if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
+ std::string unique_key_update_mode_str =
http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
+ auto iter =
unique_key_update_mode_map.find(unique_key_update_mode_str);
+ if (iter != unique_key_update_mode_map.end()) {
+ TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
+ if (unique_key_update_mode ==
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
+ // check constraints when flexible partial update is enabled
+ if (ctx->format != TFileFormatType::FORMAT_JSON) {
+ return Status::InvalidArgument(
+ "flexible partial update only support json format
as input file "
+ "currently");
+ }
+ if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
+ iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
fuzzy_parse is enabled");
+ }
+ if (!http_req->header(HTTP_COLUMNS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
columns is specified");
+ }
+ if (!http_req->header(HTTP_JSONPATHS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
jsonpaths is specified");
+ }
+ if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
hidden_columns is "
+ "specified");
+ }
+ if (!http_req->header(HTTP_FUNCTION_COLUMN + "." +
HTTP_SEQUENCE_COL).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when "
+ "function_column.sequence_col is specified");
+ }
+ if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when "
+ "merge_type is specified");
+ }
+ }
+ request.__set_unique_key_update_mode(unique_key_update_mode);
+ } else {
+ return Status::InvalidArgument(
+ "Invalid unique_key_partial_mode {}, must be UPSERT,
PARTIAL_UPDATE or "
+ "FLEXIBLE_PARTIAL_UPDATE",
Review Comment:
PARTIAL_UPDATE -> UPDATE_FIXED_COLUMNS
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3559,6 +3559,12 @@ private static void addOlapTablePropertyInfo(OlapTable
olapTable, StringBuilder
sb.append(olapTable.getEnableUniqueKeyMergeOnWrite()).append("\"");
}
+ // enable_unique_key_skip_bitmap, always print this property for
merge-on-write unique table
+ if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS &&
olapTable.getEnableUniqueKeyMergeOnWrite()) {
+
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_SKIP_BITMAP_COLUMN).append("\"
= \"");
+ sb.append(olapTable.getEnableUniqueKeySkipBitmap()).append("\"");
+ }
Review Comment:
We should print it only if it is true.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java:
##########
@@ -126,8 +127,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int
fragmentInstanceIdInde
&& !destTable.hasDeleteSign()) {
throw new AnalysisException("load by MERGE or DELETE need to
upgrade table to support batch delete.");
}
-
- if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() &&
destTable.getSequenceMapCol() == null) {
+ TUniqueKeyUpdateMode uniquekeyUpdateMode =
taskInfo.getUniqueKeyUpdateMode();
+ if (uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
+ && destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() &&
destTable.getSequenceMapCol() == null) {
throw new UserException("Table " + destTable.getName()
+ " has sequence column, need to specify the sequence
column");
}
Review Comment:
We should rethink here, why we handle fixed partial update and flexible
partial update differently.
##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_enable_profile(false);
}
}
- if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+ StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+ {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+ {"UPDATE_FIXED_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+ {"UPDATE_FLEXIBLE_COLUMNS",
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
+ if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
+ std::string unique_key_update_mode_str =
http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
+ auto iter =
unique_key_update_mode_map.find(unique_key_update_mode_str);
+ if (iter != unique_key_update_mode_map.end()) {
+ TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
+ if (unique_key_update_mode ==
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
+ // check constraints when flexible partial update is enabled
+ if (ctx->format != TFileFormatType::FORMAT_JSON) {
+ return Status::InvalidArgument(
+ "flexible partial update only support json format
as input file "
+ "currently");
+ }
+ if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
+ iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
fuzzy_parse is enabled");
+ }
+ if (!http_req->header(HTTP_COLUMNS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
columns is specified");
+ }
+ if (!http_req->header(HTTP_JSONPATHS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
jsonpaths is specified");
+ }
+ if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when
hidden_columns is "
+ "specified");
+ }
+ if (!http_req->header(HTTP_FUNCTION_COLUMN + "." +
HTTP_SEQUENCE_COL).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when "
+ "function_column.sequence_col is specified");
+ }
+ if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
+ return Status::InvalidArgument(
+ "Don't support flexible partial update when "
+ "merge_type is specified");
+ }
+ }
+ request.__set_unique_key_update_mode(unique_key_update_mode);
+ } else {
+ return Status::InvalidArgument(
+ "Invalid unique_key_partial_mode {}, must be UPSERT,
PARTIAL_UPDATE or "
+ "FLEXIBLE_PARTIAL_UPDATE",
Review Comment:
UPDATE_FLEXIBLE_COLUMNS
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java:
##########
@@ -704,6 +705,14 @@ public static ColumnDefinition
newVersionColumnDefinition(AggregateType aggregat
Optional.of(new DefaultValue(DefaultValue.ZERO_NUMBER)),
"doris version hidden column", false);
}
+ // used in CreateTableInfo.validate(), specify the default value as
DefaultValue.NULL_DEFAULT_VALUE
+ // becasue ColumnDefinition.validate() will check that bitmap type column
don't set default value
+ // and then set the default value of that column to bitmap_empty()
+ public static ColumnDefinition newSkipBitmapColumnDef(AggregateType
aggregateType) {
+ return new ColumnDefinition(Column.SKIP_BITMAP_COL,
BitmapType.INSTANCE, false, aggregateType, false,
+ Optional.of(DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE), "doris
skip bitmap hidden column", false);
+ }
Review Comment:
We had support EMPTY_BITMAP.
##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1422,6 +1422,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_hidden_version_column_by_default = true;
+ /**
+ * Whether to add a skip bitmap column when create merge-on-write unique
table
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean enable_skip_bitmap_column_by_default = true;
Review Comment:
We set it false now.
##########
be/src/olap/memtable.cpp:
##########
@@ -293,6 +331,12 @@ size_t MemTable::_sort() {
auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
std::inplace_merge(_row_in_blocks.begin(), new_row_it,
_row_in_blocks.end(), cmp_func);
_last_sorted_pos = _row_in_blocks.size();
+ {
+ std::string res;
+ for (const auto& row : _row_in_blocks) {
+ res += fmt::format(",{}", row->_row_pos);
+ }
+ }
Review Comment:
useless code?
##########
fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java:
##########
@@ -163,8 +166,21 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int
fragmentInstanceIdInde
}
}
+ if (uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS &&
!destTable.hasSkipBitmapColumn()) {
+ throw new UserException("Flexible partial update can only support
table with skip bitmap hidden column."
+ + " But table " + destTable.getName() + " doesn't have
it");
Review Comment:
Provide hint to user how to add hidden column.
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -774,10 +782,34 @@ Status NewJsonReader::_set_column_value(rapidjson::Value&
objectValue, Block& bl
}
has_valid_value = true;
} else {
- // not found, filling with default value
- RETURN_IF_ERROR(_fill_missing_column(slot_desc, column_ptr,
valid));
- if (!(*valid)) {
- return Status::OK();
+ if (_should_process_skip_bitmap_col()) {
+ // not found, skip this column in flexible partial update
+ if (slot_desc->is_key() && !slot_desc->is_auto_increment()) {
+ RETURN_IF_ERROR(
+ _append_error_msg(objectValue,
+ "The key columns can not be
ommited in flexible "
+ "partial update, missing key
column: {}",
+ slot_desc->col_name(), valid));
+ // remove this line in block
+ for (int i = 0; i < block.columns(); ++i) {
+ auto column =
block.get_by_position(i).column->assume_mutable();
+ if (column->size() != cur_row_count) {
+ DCHECK(column->size() == cur_row_count + 1);
+ column->pop_back(1);
+ DCHECK(column->size() == cur_row_count);
+ }
+ }
Review Comment:
Should follow max_filter_ratio
##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -238,7 +248,15 @@ public String getExplainString(String prefix,
TExplainLevel explainLevel) {
}
strBuilder.append(prefix + " TUPLE ID: " + tupleDescriptor.getId() +
"\n");
strBuilder.append(prefix + " " +
DataPartition.RANDOM.getExplainString(explainLevel));
+ boolean isPartialUpdate = uniqueKeyUpdateMode !=
TUniqueKeyUpdateMode.UPSERT;
strBuilder.append(prefix + " IS_PARTIAL_UPDATE: " + isPartialUpdate);
+ if (isPartialUpdate) {
+ if (uniqueKeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
+ strBuilder.append(prefix + " PARTIAL_UPDATE_MODE:
FIXED_PARTIAL_UPDATE");
+ } else {
+ strBuilder.append(prefix + " PARTIAL_UPDATE_MODE:
FLEXIBLE_PARTIAL_UPDATE");
Review Comment:
Should be same as option in streamload.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]