This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 221e860cb7 [Feature](Routine Load)Support Partial Update (#22785)
221e860cb7 is described below

commit 221e860cb7c86d79bc99185f34ae2dd2f7a10be8
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Aug 10 17:41:53 2023 +0800

    [Feature](Routine Load)Support Partial Update (#22785)
---
 .../Load/ALTER-ROUTINE-LOAD.md                     |  1 +
 .../Load/CREATE-ROUTINE-LOAD.md                    |  3 +++
 .../Load/ALTER-ROUTINE-LOAD.md                     |  1 +
 .../Load/CREATE-ROUTINE-LOAD.md                    |  3 +++
 .../doris/analysis/AlterRoutineLoadStmt.java       | 29 ++++++++++++++++++++++
 .../doris/analysis/CreateRoutineLoadStmt.java      | 27 ++++++++++++++++----
 .../load/routineload/KafkaRoutineLoadJob.java      |  4 +++
 .../doris/load/routineload/RoutineLoadJob.java     | 15 ++++++++++-
 8 files changed, 77 insertions(+), 6 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
index 3a9d4d79cf..0e45d21245 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
@@ -69,6 +69,7 @@ FROM data_source
    10. `timezone`
    11. `num_as_string`
    12. `fuzzy_parse`
+   13. `partial_columns`
 
 
 4. `data_source`
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 0d1966d6d6..2814d3b20c 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -249,6 +249,9 @@ FROM data_source [data_source_properties]
   11. `load_to_single_tablet`
       Boolean type, True means that one task can only load data to one tablet 
in the corresponding partition at a time. The default value is false. This 
parameter can only be set when loading data into the OLAP table with random 
partition.
 
+  12. `partial_columns`
+      Boolean type, True means that use partial column update, the default 
value is false, this parameter is only allowed to be set when the table model 
is Unique and Merge on Write is used. Multi-table does not support this 
parameter.
+  
 - `FROM data_source [data_source_properties]`
 
   The type of data source. Currently supports:
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
index fbf1e57969..b52ce004d7 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
@@ -69,6 +69,7 @@ FROM data_source
     10. `timezone`
     11. `num_as_string`
     12. `fuzzy_parse`
+    13. `partial_columns`
 
 
 4. `data_source`
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 7181984784..60bce047a0 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -248,6 +248,9 @@ FROM data_source [data_source_properties]
 
       布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 
分区的 olap 表导数的时候设置。
 
+  12. `partial_columns`
+      布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 
时设置。一流多表不支持此参数。
+
 - `FROM data_source [data_source_properties]`
 
   数据源的类型。当前支持:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 8c32ae3cb5..0e25725cce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -17,7 +17,10 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
@@ -59,6 +62,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
             .add(CreateRoutineLoadStmt.NUM_AS_STRING)
             .add(CreateRoutineLoadStmt.FUZZY_PARSE)
+            .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .build();
@@ -67,6 +71,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
     private final Map<String, String> jobProperties;
     private final Map<String, String> dataSourceMapProperties;
 
+    private boolean isPartialUpdate;
+
     // save analyzed job properties.
     // analyzed data source properties are saved in dataSourceProperties.
     private Map<String, String> analyzedJobProperties = Maps.newHashMap();
@@ -76,6 +82,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         this.labelName = labelName;
         this.jobProperties = jobProperties != null ? jobProperties : 
Maps.newHashMap();
         this.dataSourceMapProperties = dataSourceProperties != null ? 
dataSourceProperties : Maps.newHashMap();
+        this.isPartialUpdate = 
this.jobProperties.getOrDefault(CreateRoutineLoadStmt.PARTIAL_COLUMNS, "false")
+                .equalsIgnoreCase("true");
     }
 
     public String getDbName() {
@@ -111,12 +119,29 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         checkJobProperties();
         // check data source properties
         checkDataSourceProperties();
+        checkPartialUpdate();
 
         if (analyzedJobProperties.isEmpty() && 
MapUtils.isEmpty(dataSourceMapProperties)) {
             throw new AnalysisException("No properties are specified");
         }
     }
 
+    private void checkPartialUpdate() throws UserException {
+        if (!isPartialUpdate) {
+            return;
+        }
+        RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
+                .getJob(getDbName(), getLabel());
+        if (job.isMultiTable()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is not 
supported in multi-table load.");
+        }
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
+        Table table = db.getTableOrAnalysisException(job.getTableName());
+        if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
+        }
+    }
+
     private void checkJobProperties() throws UserException {
         Optional<String> optional = jobProperties.keySet().stream().filter(
                 entity -> 
!CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst();
@@ -203,6 +228,10 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             boolean fuzzyParse = 
Boolean.parseBoolean(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE));
             analyzedJobProperties.put(CreateRoutineLoadStmt.FUZZY_PARSE, 
String.valueOf(fuzzyParse));
         }
+        if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+            analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
+                    String.valueOf(isPartialUpdate));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 0f196bd77b..3d0720a10c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -38,6 +38,7 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -106,6 +107,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     public static final String NUM_AS_STRING = "num_as_string";
     public static final String FUZZY_PARSE = "fuzzy_parse";
 
+    public static final String PARTIAL_COLUMNS = "partial_columns";
+
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
@@ -131,6 +134,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(EXEC_MEM_LIMIT_PROPERTY)
             .add(SEND_BATCH_PARALLELISM)
             .add(LOAD_TO_SINGLE_TABLET)
+            .add(PARTIAL_COLUMNS)
             .build();
 
     private final LabelName labelName;
@@ -157,8 +161,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     /**
      * RoutineLoad support json data.
      * Require Params:
-     *   1) dataFormat = "json"
-     *   2) jsonPaths = "$.XXX.xxx"
+     * 1) dataFormat = "json"
+     * 2) jsonPaths = "$.XXX.xxx"
      */
     private String format = ""; //default is csv.
     private String jsonPaths = "";
@@ -167,6 +171,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private boolean numAsString = false;
     private boolean fuzzyParse = false;
 
+    /**
+     * support partial columns load(Only Unique Key Columns)
+     */
+    @Getter
+    private boolean isPartialUpdate = false;
+
     private String comment = "";
 
     private LoadTask.MergeType mergeType;
@@ -196,6 +206,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
                 .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
         this.mergeType = mergeType;
+        this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
         if (comment != null) {
             this.comment = comment;
         }
@@ -323,6 +334,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         dbName = labelName.getDbName();
         name = labelName.getLabelName();
         Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        if (isPartialUpdate && isMultiTable) {
+            throw new AnalysisException("Partial update is not supported in 
multi-table load.");
+        }
         if (isMultiTable) {
             return;
         }
@@ -339,6 +353,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 && !(table.getType() == Table.TableType.OLAP && ((OlapTable) 
table).hasDeleteSign())) {
             throw new AnalysisException("load by MERGE or DELETE need to 
upgrade table to support batch delete.");
         }
+        if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
+        }
     }
 
     public void checkLoadProperties() throws UserException {
@@ -409,9 +426,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             }
         }
         routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, 
importColumnsStmt,
-                        precedingImportWhereStmt, importWhereStmt,
-                        partitionNames, importDeleteOnStmt == null ? null : 
importDeleteOnStmt.getExpr(), mergeType,
-                        importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
+                precedingImportWhereStmt, importWhereStmt,
+                partitionNames, importDeleteOnStmt == null ? null : 
importDeleteOnStmt.getExpr(), mergeType,
+                importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
     }
 
     private void checkJobProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 0d6cb534cf..43cd058589 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -53,6 +53,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -683,6 +684,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             Map<String, String> copiedJobProperties = 
Maps.newHashMap(jobProperties);
             modifyCommonJobProperties(copiedJobProperties);
             this.jobProperties.putAll(copiedJobProperties);
+            if 
(jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+                this.isPartialUpdate = 
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
+            }
         }
         LOG.info("modify the properties of kafka routine load job: {}, 
jobProperties: {}, datasource properties: {}",
                 this.id, jobProperties, dataSourceProperties);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index b0ae15b3f7..da4a2ce7f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -196,6 +196,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS;
     protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
 
+    protected boolean isPartialUpdate = false;
+
     protected String sequenceCol;
 
     /**
@@ -335,6 +337,10 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
String.valueOf(this.execMemLimit));
         jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
         jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, 
String.valueOf(this.loadToSingleTablet));
+        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
stmt.isPartialUpdate() ? "true" : "false");
+        if (stmt.isPartialUpdate()) {
+            this.isPartialUpdate = true;
+        }
 
         if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
             jobProperties.put(PROPS_FORMAT, "csv");
@@ -624,7 +630,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     @Override
     public boolean isPartialUpdate() {
-        return false;
+        return isPartialUpdate;
     }
 
     @Override
@@ -1472,6 +1478,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes, false);
         appendProperties(sb, PROPS_FORMAT, getFormat(), false);
+        if (isPartialUpdate) {
+            appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
isPartialUpdate, false);
+        }
         appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
         appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), 
false);
         appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
@@ -1550,6 +1559,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             jobProperties.put("columnSeparator", columnSeparator == null ? 
"\t" : columnSeparator.toString());
             jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : 
lineDelimiter.toString());
         }
+        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
         jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
         jobProperties.put("maxBatchIntervalS", 
String.valueOf(maxBatchIntervalS));
         jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows));
@@ -1700,6 +1710,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             String key = Text.readString(in);
             String value = Text.readString(in);
             jobProperties.put(key, value);
+            if (key.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+                isPartialUpdate = Boolean.parseBoolean(value);
+            }
         }
 
         size = in.readInt();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to