This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d9bd1fad67a [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload
(#26383)
d9bd1fad67a is described below
commit d9bd1fad67af08d4d8027035e39855535de4a433
Author: wuwenchi <[email protected]>
AuthorDate: Mon Nov 6 09:56:46 2023 +0800
[fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383)
When a Unique key table carries the `enable_unique_key_merge_on_write`
attribute, the value of the agg type is none. Therefore, when doing sparkload,
we need to specify the agg type as `REPLACE`.
---
.../org/apache/doris/load/loadv2/SparkLoadPendingTask.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 8e9599f774b..32749fd8a77 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask {
long indexId = entry.getKey();
int schemaHash = table.getSchemaHashByIndexId(indexId);
+ boolean changeAggType =
table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+ &&
table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
// columns
List<EtlColumn> etlColumns = Lists.newArrayList();
for (Column column : entry.getValue()) {
- etlColumns.add(createEtlColumn(column));
+ etlColumns.add(createEtlColumn(column, changeAggType));
}
// check distribution type
@@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask {
return etlIndexes;
}
- private EtlColumn createEtlColumn(Column column) {
+ private EtlColumn createEtlColumn(Column column, boolean changeAggType) {
// column name
String name = column.getName().toLowerCase(Locale.ROOT);
// column type
@@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask {
// aggregation type
String aggregationType = null;
if (column.getAggregationType() != null) {
- aggregationType = column.getAggregationType().toString();
+ if (changeAggType && !column.isKey()) {
+ aggregationType = AggregateType.REPLACE.toSql();
+ } else {
+ aggregationType = column.getAggregationType().toString();
+ }
}
// default value
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]