This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a41593672a9 [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload
#26383 (#26414)
a41593672a9 is described below
commit a41593672a9014bedc2c22ce9778e6c92d5415a1
Author: wuwenchi <[email protected]>
AuthorDate: Fri Nov 3 23:36:51 2023 +0800
[fix](spark-load)fix-Unique-key-with-MOR-by-sparkload #26383 (#26414)
---
.../org/apache/doris/load/loadv2/SparkLoadPendingTask.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 8e9599f774b..32749fd8a77 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask {
long indexId = entry.getKey();
int schemaHash = table.getSchemaHashByIndexId(indexId);
+ boolean changeAggType =
table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+ &&
table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
// columns
List<EtlColumn> etlColumns = Lists.newArrayList();
for (Column column : entry.getValue()) {
- etlColumns.add(createEtlColumn(column));
+ etlColumns.add(createEtlColumn(column, changeAggType));
}
// check distribution type
@@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask {
return etlIndexes;
}
- private EtlColumn createEtlColumn(Column column) {
+ private EtlColumn createEtlColumn(Column column, boolean changeAggType) {
// column name
String name = column.getName().toLowerCase(Locale.ROOT);
// column type
@@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask {
// aggregation type
String aggregationType = null;
if (column.getAggregationType() != null) {
- aggregationType = column.getAggregationType().toString();
+ if (changeAggType && !column.isKey()) {
+ aggregationType = AggregateType.REPLACE.toSql();
+ } else {
+ aggregationType = column.getAggregationType().toString();
+ }
}
// default value
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]