This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f2a82f6971a [fix](routine load) fix enclose and escape can not set in 
routine load job (#38402) (#39184)
f2a82f6971a is described below

commit f2a82f6971adeef996f0b1ef5c8d2995684d138f
Author: hui lai <[email protected]>
AuthorDate: Sun Aug 11 17:46:32 2024 +0800

    [fix](routine load) fix enclose and escape can not set in routine load job 
(#38402) (#39184)
    
    pick (#38402)
---
 .../doris/analysis/AlterRoutineLoadStmt.java       |  8 ++++++
 .../doris/analysis/CreateRoutineLoadStmt.java      | 30 +++++++++++++++++++++-
 .../org/apache/doris/analysis/DataDescription.java | 19 +++++++++++++-
 .../java/org/apache/doris/analysis/LoadStmt.java   |  4 +++
 .../doris/load/routineload/RoutineLoadJob.java     |  9 ++++++-
 5 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 8493ab25661..21dc4dec651 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -66,6 +66,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
+            .add(LoadStmt.KEY_ENCLOSE)
+            .add(LoadStmt.KEY_ESCAPE)
             .build();
 
     private final LabelName labelName;
@@ -242,6 +244,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
                     String.valueOf(isPartialUpdate));
         }
+        if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) {
+            analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, 
jobProperties.get(LoadStmt.KEY_ENCLOSE));
+        }
+        if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) {
+            analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, 
jobProperties.get(LoadStmt.KEY_ESCAPE));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index c825b6d6539..0e2ed01c6c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -138,6 +138,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(SEND_BATCH_PARALLELISM)
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
+            .add(LoadStmt.KEY_ENCLOSE)
+            .add(LoadStmt.KEY_ESCAPE)
             .build();
 
     private final LabelName labelName;
@@ -174,7 +176,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private boolean stripOuterArray = false;
     private boolean numAsString = false;
     private boolean fuzzyParse = false;
-
+    private byte enclose;
+    private byte escape;
     /**
      * support partial columns load(Only Unique Key Columns)
      */
@@ -302,6 +305,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return jsonPaths;
     }
 
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public byte getEscape() {
+        return escape;
+    }
+
     public String getJsonRoot() {
         return jsonRoot;
     }
@@ -486,6 +497,23 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
                 LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
 
+        String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
+        if (encloseStr != null) {
+            if (encloseStr.length() != 1) {
+                throw new AnalysisException("enclose must be single-char");
+            } else {
+                enclose = encloseStr.getBytes()[0];
+            }
+        }
+        String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
+        if (escapeStr != null) {
+            if (escapeStr.length() != 1) {
+                throw new AnalysisException("enclose must be single-char");
+            } else {
+                escape = escapeStr.getBytes()[0];
+            }
+        }
+
         if (ConnectContext.get() != null) {
             timezone = ConnectContext.get().getSessionVariable().getTimeZone();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 618c80df5c0..f7944eca68e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -51,6 +51,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -862,7 +863,7 @@ public class DataDescription implements InsertStmt.DataDesc 
{
             return;
         }
         String columnsSQL = "COLUMNS (" + columnDef + ")";
-        SqlParser parser = new SqlParser(new SqlScanner(new 
StringReader(columnsSQL)));
+        SqlParser parser = new SqlParser(new 
org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL)));
         ImportColumnsStmt columnsStmt;
         try {
             columnsStmt = (ImportColumnsStmt) 
SqlParserUtils.getFirstStmt(parser);
@@ -998,6 +999,22 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) {
             skipLines = 
Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES));
         }
+        if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) {
+            String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE);
+            if (encloseProp.length() == 1) {
+                enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0];
+            } else {
+                throw new AnalysisException("enclose must be single-char");
+            }
+        }
+        if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) {
+            String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE);
+            if (escapeProp.length() == 1) {
+                escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0];
+            } else {
+                throw new AnalysisException("escape must be single-char");
+            }
+        }
     }
 
     private void checkLoadPriv(String fullDbName) throws AnalysisException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index fab011af528..773d54ee56d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -125,6 +125,10 @@ public class LoadStmt extends DdlStmt {
 
     public static final String KEY_COMMENT = "comment";
 
+    public static final String KEY_ENCLOSE = "enclose";
+
+    public static final String KEY_ESCAPE = "escape";
+
     private final LabelName label;
     private final List<DataDescription> dataDescriptions;
     private final BrokerDesc brokerDesc;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 58eff713b45..57aa84e7731 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -350,7 +350,6 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             this.isPartialUpdate = true;
         }
         jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
-
         if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
             jobProperties.put(PROPS_FORMAT, "csv");
         } else if (stmt.getFormat().equals("json")) {
@@ -384,6 +383,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         } else {
             jobProperties.put(PROPS_FUZZY_PARSE, "false");
         }
+        if (String.valueOf(stmt.getEnclose()) != null) {
+            this.enclose = stmt.getEnclose();
+            jobProperties.put(LoadStmt.KEY_ENCLOSE, 
String.valueOf(stmt.getEnclose()));
+        }
+        if (String.valueOf(stmt.getEscape()) != null) {
+            this.escape = stmt.getEscape();
+            jobProperties.put(LoadStmt.KEY_ESCAPE, 
String.valueOf(stmt.getEscape()));
+        }
     }
 
     private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to