This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b3abfaba6d8 [feat](refactor-param) refactor routineLoad's code about 
fileformat (#50552)
b3abfaba6d8 is described below

commit b3abfaba6d847e12e3b35a1284dd5b68fd077e4a
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed May 14 10:03:47 2025 +0800

    [feat](refactor-param) refactor routineLoad's code about fileformat (#50552)
    
    Issue Number: #50238
    
    Problem Summary:
    
    Previously, we refactored the code of the fileFormat attribute (#50225).
    However, we only added the relevant code without modifying the business
    code. This pull request modifies the code of the `RoutineLoad` feature
    that is related to the fileformat.
---
 .../doris/analysis/CreateRoutineLoadStmt.java      | 105 +++------------------
 .../org/apache/doris/analysis/OutFileClause.java   |   2 +-
 .../doris/common/util/FileFormatConstants.java     |   4 -
 .../fileformat/AvroFileFormatProperties.java       |   2 +-
 .../fileformat/CsvFileFormatProperties.java        |  29 ++++--
 .../property/fileformat/FileFormatProperties.java  |  18 ++--
 .../fileformat/JsonFileFormatProperties.java       |   8 +-
 .../fileformat/OrcFileFormatProperties.java        |   2 +-
 .../fileformat/ParquetFileFormatProperties.java    |   2 +-
 .../fileformat/WalFileFormatProperties.java        |   2 +-
 .../doris/load/routineload/RoutineLoadJob.java     | 100 +++++++-------------
 .../plans/commands/info/CreateRoutineLoadInfo.java |  91 ++++--------------
 .../fileformat/CsvFileFormatPropertiesTest.java    |   2 +-
 .../load_p0/routine_load/test_routine_load.groovy  |   4 +-
 14 files changed, 115 insertions(+), 256 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 403612f989c..804d9647a53 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
 import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -169,22 +170,8 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
     private int sendBatchParallelism = 1;
     private boolean loadToSingleTablet = false;
-    /**
-     * RoutineLoad support json data.
-     * Require Params:
-     * 1) dataFormat = "json"
-     * 2) jsonPaths = "$.XXX.xxx"
-     */
-    private String format = ""; //default is csv.
-    private String jsonPaths = "";
-    private String jsonRoot = ""; // MUST be a jsonpath string
-    private boolean stripOuterArray = false;
-    private boolean numAsString = false;
-    private boolean fuzzyParse = false;
 
-    private byte enclose;
-
-    private byte escape;
+    private FileFormatProperties fileFormatProperties;
 
     private String workloadGroupName = "";
 
@@ -229,6 +216,8 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
         if (comment != null) {
             this.comment = comment;
         }
+        String format = 
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+        fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(format);
     }
 
     /*
@@ -239,9 +228,9 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
             Map<String, String> jobProperties, String typeName, 
RoutineLoadDesc routineLoadDesc,
             int desireTaskConcurrentNum, long maxErrorNum, double 
maxFilterRatio, long maxBatchIntervalS,
             long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int 
sendBatchParallelism, String timezone,
-            String format, String jsonPaths, String jsonRoot, byte enclose, 
byte escape, String workloadGroupName,
-            boolean loadToSingleTablet, boolean strictMode, boolean 
isPartialUpdate, boolean stripOuterArray,
-            boolean numAsString, boolean fuzzyParse, 
AbstractDataSourceProperties dataSourceProperties) {
+            String workloadGroupName, boolean loadToSingleTablet, boolean 
strictMode,
+            boolean isPartialUpdate, AbstractDataSourceProperties 
dataSourceProperties,
+            FileFormatProperties fileFormatProperties) {
         this.labelName = labelName;
         this.dbName = dbName;
         this.name = name;
@@ -261,19 +250,12 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
         this.execMemLimit = execMemLimit;
         this.sendBatchParallelism = sendBatchParallelism;
         this.timezone = timezone;
-        this.format = format;
-        this.jsonPaths = jsonPaths;
-        this.jsonRoot = jsonRoot;
-        this.enclose = enclose;
-        this.escape = escape;
         this.workloadGroupName = workloadGroupName;
         this.loadToSingleTablet = loadToSingleTablet;
         this.strictMode = strictMode;
         this.isPartialUpdate = isPartialUpdate;
-        this.stripOuterArray = stripOuterArray;
-        this.numAsString = numAsString;
-        this.fuzzyParse = fuzzyParse;
         this.dataSourceProperties = dataSourceProperties;
+        this.fileFormatProperties = fileFormatProperties;
     }
 
     public String getName() {
@@ -340,42 +322,14 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
         return timezone;
     }
 
-    public String getFormat() {
-        return format;
-    }
-
-    public boolean isStripOuterArray() {
-        return stripOuterArray;
-    }
-
-    public boolean isNumAsString() {
-        return numAsString;
-    }
-
-    public boolean isFuzzyParse() {
-        return fuzzyParse;
-    }
-
-    public String getJsonPaths() {
-        return jsonPaths;
-    }
-
-    public byte getEnclose() {
-        return enclose;
-    }
-
-    public byte getEscape() {
-        return escape;
-    }
-
-    public String getJsonRoot() {
-        return jsonRoot;
-    }
-
     public LoadTask.MergeType getMergeType() {
         return mergeType;
     }
 
+    public FileFormatProperties getFileFormatProperties() {
+        return fileFormatProperties;
+    }
+
     public AbstractDataSourceProperties getDataSourceProperties() {
         return dataSourceProperties;
     }
@@ -564,23 +518,6 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
                 RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
                 LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
 
-        String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
-        if (encloseStr != null) {
-            if (encloseStr.length() != 1) {
-                throw new AnalysisException("enclose must be single-char");
-            } else {
-                enclose = encloseStr.getBytes()[0];
-            }
-        }
-        String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
-        if (escapeStr != null) {
-            if (escapeStr.length() != 1) {
-                throw new AnalysisException("enclose must be single-char");
-            } else {
-                escape = escapeStr.getBytes()[0];
-            }
-        }
-
         String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
         if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
             ConnectContext tmpCtx = new ConnectContext();
@@ -603,23 +540,7 @@ public class CreateRoutineLoadStmt extends DdlStmt 
implements NotFallbackInParse
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
 timezone));
 
-        format = jobProperties.get(FORMAT);
-        if (format != null) {
-            if (format.equalsIgnoreCase("csv")) {
-                format = ""; // if it's not json, then it's mean csv and set 
empty
-            } else if (format.equalsIgnoreCase("json")) {
-                format = "json";
-                jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
-                jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
-                stripOuterArray = 
Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
-                numAsString = 
Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
-                fuzzyParse = 
Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
-            } else {
-                throw new UserException("Format type is invalid. format=`" + 
format + "`");
-            }
-        } else {
-            format = "csv"; // default csv
-        }
+        fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 744442955c8..6ff7ab6c5df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -731,7 +731,7 @@ public class OutFileClause {
     public String toSql() {
         StringBuilder sb = new StringBuilder();
         sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
-                .append(fileFormatProperties.getFileFormatType());
+                .append(fileFormatProperties.getFormatName());
         if (properties != null && !properties.isEmpty()) {
             sb.append(" PROPERTIES(");
             sb.append(new PrintableMap<>(properties, " = ", true, false));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 7050bec9d77..2cd5852dea4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -20,10 +20,6 @@ package org.apache.doris.common.util;
 import java.util.regex.Pattern;
 
 public class FileFormatConstants {
-    public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
-    public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
-    public static final String DEFAULT_LINE_DELIMITER = "\n";
-
     public static final String FORMAT_CSV = "csv";
     public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
     public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = 
"csv_with_names_and_types";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
index 6d2b799ea00..7622cd19644 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
@@ -27,7 +27,7 @@ import java.util.Map;
 
 public class AvroFileFormatProperties extends FileFormatProperties {
     public AvroFileFormatProperties() {
-        super(TFileFormatType.FORMAT_AVRO);
+        super(TFileFormatType.FORMAT_AVRO, FileFormatProperties.FORMAT_AVRO);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 0efea98a5c3..097ea5ab5a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -53,6 +53,7 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
     public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
 
     public static final String PROP_ENCLOSE = "enclose";
+    public static final String PROP_ESCAPE = "escape";
 
     private String headerType = "";
     private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
@@ -62,24 +63,26 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
     private int skipLines;
     private byte enclose;
 
+    private byte escape;
+
     // used by tvf
     // User specified csv columns, it will override columns got from file
     private final List<Column> csvSchema = Lists.newArrayList();
 
     String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
 
-    public CsvFileFormatProperties() {
-        super(TFileFormatType.FORMAT_CSV_PLAIN);
+    public CsvFileFormatProperties(String formatName) {
+        super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
     }
 
-    public CsvFileFormatProperties(String defaultColumnSeparator, 
TTextSerdeType textSerdeType) {
-        super(TFileFormatType.FORMAT_CSV_PLAIN);
+    public CsvFileFormatProperties(String defaultColumnSeparator, 
TTextSerdeType textSerdeType, String formatName) {
+        super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
         this.defaultColumnSeparator = defaultColumnSeparator;
         this.textSerdeType = textSerdeType;
     }
 
-    public CsvFileFormatProperties(String headerType) {
-        super(TFileFormatType.FORMAT_CSV_PLAIN);
+    public CsvFileFormatProperties(String headerType, String formatName) {
+        super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
         this.headerType = headerType;
     }
 
@@ -115,6 +118,16 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
                 }
             }
 
+            String escapeStr = getOrDefault(formatProperties, PROP_ESCAPE,
+                    "", isRemoveOriginProperty);
+            if (!Strings.isNullOrEmpty(escapeStr)) {
+                if (escapeStr.length() != 1) {
+                    throw new AnalysisException("escape must be single-char");
+                } else {
+                    escape = escapeStr.getBytes()[0];
+                }
+            }
+
             trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
                     PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
                     .booleanValue();
@@ -186,6 +199,10 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
         return enclose;
     }
 
+    public byte getEscape() {
+        return escape;
+    }
+
     public List<Column> getCsvSchema() {
         return csvSchema;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index bd0ecc214c6..81cf090fa22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -40,12 +40,14 @@ public abstract class FileFormatProperties {
     public static final String FORMAT_ARROW = "arrow";
     public static final String PROP_COMPRESS_TYPE = "compress_type";
 
+    protected String formatName;
     protected TFileFormatType fileFormatType;
 
     protected TFileCompressType compressionType;
 
-    public FileFormatProperties(TFileFormatType fileFormatType) {
+    public FileFormatProperties(TFileFormatType fileFormatType, String 
formatName) {
         this.fileFormatType = fileFormatType;
+        this.formatName = formatName;
     }
 
     /**
@@ -73,16 +75,14 @@ public abstract class FileFormatProperties {
     public static FileFormatProperties createFileFormatProperties(String 
formatString) {
         switch (formatString) {
             case FORMAT_CSV:
-                return new CsvFileFormatProperties();
+                return new CsvFileFormatProperties(formatString);
             case FORMAT_HIVE_TEXT:
                 return new 
CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
-                        TTextSerdeType.HIVE_TEXT_SERDE);
+                        TTextSerdeType.HIVE_TEXT_SERDE, formatString);
             case FORMAT_CSV_WITH_NAMES:
-                return new CsvFileFormatProperties(
-                        FORMAT_CSV_WITH_NAMES);
+                return new CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES, 
formatString);
             case FORMAT_CSV_WITH_NAMES_AND_TYPES:
-                return new CsvFileFormatProperties(
-                        FORMAT_CSV_WITH_NAMES_AND_TYPES);
+                return new 
CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES_AND_TYPES, formatString);
             case FORMAT_PARQUET:
                 return new ParquetFileFormatProperties();
             case FORMAT_ORC:
@@ -121,4 +121,8 @@ public abstract class FileFormatProperties {
     public TFileCompressType getCompressionType() {
         return compressionType;
     }
+
+    public String getFormatName() {
+        return formatName;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
index 3431d366f8b..238844bee22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
@@ -37,14 +37,14 @@ public class JsonFileFormatProperties extends 
FileFormatProperties {
     // from ExternalFileTableValuedFunction:
     private String jsonRoot = "";
     private String jsonPaths = "";
-    private boolean stripOuterArray;
+    private boolean stripOuterArray = false;
     private boolean readJsonByLine;
-    private boolean numAsString;
-    private boolean fuzzyParse;
+    private boolean numAsString = false;
+    private boolean fuzzyParse = false;
 
 
     public JsonFileFormatProperties() {
-        super(TFileFormatType.FORMAT_JSON);
+        super(TFileFormatType.FORMAT_JSON, FileFormatProperties.FORMAT_JSON);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
index 412c3d237e8..ac88b225181 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
@@ -41,7 +41,7 @@ public class OrcFileFormatProperties extends 
FileFormatProperties {
     private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
 
     public OrcFileFormatProperties() {
-        super(TFileFormatType.FORMAT_ORC);
+        super(TFileFormatType.FORMAT_ORC, FileFormatProperties.FORMAT_ORC);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
index 8063b25964a..18d1484e596 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -62,7 +62,7 @@ public class ParquetFileFormatProperties extends 
FileFormatProperties {
     private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
 
     public ParquetFileFormatProperties() {
-        super(TFileFormatType.FORMAT_PARQUET);
+        super(TFileFormatType.FORMAT_PARQUET, 
FileFormatProperties.FORMAT_PARQUET);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
index 0c6b1777cf6..af8dfd5b70f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
@@ -27,7 +27,7 @@ import java.util.Map;
 
 public class WalFileFormatProperties extends FileFormatProperties {
     public WalFileFormatProperties() {
-        super(TFileFormatType.FORMAT_WAL);
+        super(TFileFormatType.FORMAT_WAL, FileFormatProperties.FORMAT_WAL);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a363d50b505..af4a977840a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -48,6 +48,9 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
+import 
org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
 import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
@@ -227,20 +230,6 @@ public abstract class RoutineLoadJob
 
     protected boolean memtableOnSinkNode = false;
 
-    /**
-     * RoutineLoad support json data.
-     * Require Params:
-     * 1) format = "json"
-     * 2) jsonPath = "$.XXX.xxx"
-     */
-    private static final String PROPS_FORMAT = "format";
-    private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array";
-    private static final String PROPS_NUM_AS_STRING = "num_as_string";
-    private static final String PROPS_JSONPATHS = "jsonpaths";
-    private static final String PROPS_JSONROOT = "json_root";
-    private static final String PROPS_FUZZY_PARSE = "fuzzy_parse";
-
-
     protected int currentTaskConcurrentNum;
     @SerializedName("pg")
     protected RoutineLoadProgress progress;
@@ -395,47 +384,30 @@ public abstract class RoutineLoadJob
             this.isPartialUpdate = true;
         }
         jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
-        if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
-            jobProperties.put(PROPS_FORMAT, "csv");
-        } else if (stmt.getFormat().equals("json")) {
-            jobProperties.put(PROPS_FORMAT, "json");
+
+        FileFormatProperties fileFormatProperties = 
stmt.getFileFormatProperties();
+        if (fileFormatProperties instanceof CsvFileFormatProperties) {
+            CsvFileFormatProperties csvFileFormatProperties = 
(CsvFileFormatProperties) fileFormatProperties;
+            jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv");
+            jobProperties.put(LoadStmt.KEY_ENCLOSE, 
String.valueOf(csvFileFormatProperties.getEnclose()));
+            jobProperties.put(LoadStmt.KEY_ESCAPE, 
String.valueOf(csvFileFormatProperties.getEscape()));
+            this.enclose = csvFileFormatProperties.getEnclose();
+            this.escape = csvFileFormatProperties.getEscape();
+        } else if (fileFormatProperties instanceof JsonFileFormatProperties) {
+            JsonFileFormatProperties jsonFileFormatProperties = 
(JsonFileFormatProperties) fileFormatProperties;
+            jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
+            jobProperties.put(JsonFileFormatProperties.PROP_JSON_PATHS, 
jsonFileFormatProperties.getJsonPaths());
+            jobProperties.put(JsonFileFormatProperties.PROP_JSON_ROOT, 
jsonFileFormatProperties.getJsonRoot());
+            jobProperties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
+                    
String.valueOf(jsonFileFormatProperties.isStripOuterArray()));
+            jobProperties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING,
+                    String.valueOf(jsonFileFormatProperties.isNumAsString()));
+            jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
+                    String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
         } else {
             throw new UserException("Invalid format type.");
         }
 
-        if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
-            jobProperties.put(PROPS_JSONPATHS, stmt.getJsonPaths());
-        } else {
-            jobProperties.put(PROPS_JSONPATHS, "");
-        }
-        if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) {
-            jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot());
-        } else {
-            jobProperties.put(PROPS_JSONROOT, "");
-        }
-        if (stmt.isStripOuterArray()) {
-            jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true");
-        } else {
-            jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
-        }
-        if (stmt.isNumAsString()) {
-            jobProperties.put(PROPS_NUM_AS_STRING, "true");
-        } else {
-            jobProperties.put(PROPS_NUM_AS_STRING, "false");
-        }
-        if (stmt.isFuzzyParse()) {
-            jobProperties.put(PROPS_FUZZY_PARSE, "true");
-        } else {
-            jobProperties.put(PROPS_FUZZY_PARSE, "false");
-        }
-        if (String.valueOf(stmt.getEnclose()) != null) {
-            this.enclose = stmt.getEnclose();
-            jobProperties.put(LoadStmt.KEY_ENCLOSE, 
String.valueOf(stmt.getEnclose()));
-        }
-        if (String.valueOf(stmt.getEscape()) != null) {
-            this.escape = stmt.getEscape();
-            jobProperties.put(LoadStmt.KEY_ESCAPE, 
String.valueOf(stmt.getEscape()));
-        }
         if (!StringUtils.isEmpty(stmt.getWorkloadGroupName())) {
             jobProperties.put(WORKLOAD_GROUP, stmt.getWorkloadGroupName());
         }
@@ -683,7 +655,7 @@ public abstract class RoutineLoadJob
     }
 
     public String getFormat() {
-        String value = jobProperties.get(PROPS_FORMAT);
+        String value = jobProperties.get(FileFormatProperties.PROP_FORMAT);
         if (value == null) {
             return "csv";
         }
@@ -692,17 +664,17 @@ public abstract class RoutineLoadJob
 
     @Override
     public boolean isStripOuterArray() {
-        return 
Boolean.parseBoolean(jobProperties.get(PROPS_STRIP_OUTER_ARRAY));
+        return 
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY));
     }
 
     @Override
     public boolean isNumAsString() {
-        return Boolean.parseBoolean(jobProperties.get(PROPS_NUM_AS_STRING));
+        return 
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_NUM_AS_STRING));
     }
 
     @Override
     public boolean isFuzzyParse() {
-        return Boolean.parseBoolean(jobProperties.get(PROPS_FUZZY_PARSE));
+        return 
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FUZZY_PARSE));
     }
 
     @Override
@@ -750,7 +722,7 @@ public abstract class RoutineLoadJob
     }
 
     public String getJsonPaths() {
-        String value = jobProperties.get(PROPS_JSONPATHS);
+        String value = 
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
         if (value == null) {
             return "";
         }
@@ -758,7 +730,7 @@ public abstract class RoutineLoadJob
     }
 
     public String getJsonRoot() {
-        String value = jobProperties.get(PROPS_JSONROOT);
+        String value = 
jobProperties.get(JsonFileFormatProperties.PROP_JSON_ROOT);
         if (value == null) {
             return "";
         }
@@ -1808,15 +1780,15 @@ public abstract class RoutineLoadJob
         appendProperties(sb, 
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, 
false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes, false);
-        appendProperties(sb, PROPS_FORMAT, getFormat(), false);
+        appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(), 
false);
         if (isPartialUpdate) {
             appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
isPartialUpdate, false);
         }
-        appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
-        appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), 
false);
-        appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
-        appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
-        appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), false);
+        appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS, 
getJsonPaths(), false);
+        appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, 
isStripOuterArray(), false);
+        appendProperties(sb, JsonFileFormatProperties.PROP_NUM_AS_STRING, 
isNumAsString(), false);
+        appendProperties(sb, JsonFileFormatProperties.PROP_FUZZY_PARSE, 
isFuzzyParse(), false);
+        appendProperties(sb, JsonFileFormatProperties.PROP_JSON_ROOT, 
getJsonRoot(), false);
         appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
         appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
         appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
@@ -1890,7 +1862,7 @@ public abstract class RoutineLoadJob
         jobProperties.put("precedingFilter", precedingFilter == null ? 
STAR_STRING : precedingFilter.toSql());
         jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : 
whereExpr.toSql());
         if (getFormat().equalsIgnoreCase("json")) {
-            jobProperties.put(PROPS_FORMAT, "json");
+            jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
         } else {
             jobProperties.put(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR,
                     columnSeparator == null ? "\t" : 
columnSeparator.toString());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index daaed3215c2..eecbbf871b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -38,6 +38,9 @@ import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
+import 
org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
 import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -86,12 +89,6 @@ public class CreateRoutineLoadInfo {
     public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size";
     public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
 
-    public static final String FORMAT = "format"; // the value is csv or json, 
default is csv
-    public static final String STRIP_OUTER_ARRAY = "strip_outer_array";
-    public static final String JSONPATHS = "jsonpaths";
-    public static final String JSONROOT = "json_root";
-    public static final String NUM_AS_STRING = "num_as_string";
-    public static final String FUZZY_PARSE = "fuzzy_parse";
     public static final String PARTIAL_COLUMNS = "partial_columns";
     public static final String WORKLOAD_GROUP = "workload_group";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
@@ -116,12 +113,6 @@ public class CreateRoutineLoadInfo {
             .add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
             .add(MAX_BATCH_ROWS_PROPERTY)
             .add(MAX_BATCH_SIZE_PROPERTY)
-            .add(FORMAT)
-            .add(JSONPATHS)
-            .add(STRIP_OUTER_ARRAY)
-            .add(NUM_AS_STRING)
-            .add(FUZZY_PARSE)
-            .add(JSONROOT)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .add(EXEC_MEM_LIMIT_PROPERTY)
@@ -129,8 +120,14 @@ public class CreateRoutineLoadInfo {
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
             .add(WORKLOAD_GROUP)
-            .add(LoadStmt.KEY_ENCLOSE)
-            .add(LoadStmt.KEY_ESCAPE)
+            .add(FileFormatProperties.PROP_FORMAT)
+            .add(JsonFileFormatProperties.PROP_JSON_PATHS)
+            .add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
+            .add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
+            .add(JsonFileFormatProperties.PROP_FUZZY_PARSE)
+            .add(JsonFileFormatProperties.PROP_JSON_ROOT)
+            .add(CsvFileFormatProperties.PROP_ENCLOSE)
+            .add(CsvFileFormatProperties.PROP_ESCAPE)
             .build();
 
     private static final Logger LOG = 
LogManager.getLogger(CreateRoutineLoadInfo.class);
@@ -157,22 +154,7 @@ public class CreateRoutineLoadInfo {
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
     private int sendBatchParallelism = 1;
     private boolean loadToSingleTablet = false;
-    /**
-     * RoutineLoad support json data.
-     * Require Params:
-     * 1) dataFormat = "json"
-     * 2) jsonPaths = "$.XXX.xxx"
-     */
-    private String format = ""; //default is csv.
-    private String jsonPaths = "";
-    private String jsonRoot = ""; // MUST be a jsonpath string
-    private boolean stripOuterArray = false;
-    private boolean numAsString = false;
-    private boolean fuzzyParse = false;
-
-    private byte enclose;
-
-    private byte escape;
+    private FileFormatProperties fileFormatProperties;
 
     private String workloadGroupName;
 
@@ -392,23 +374,6 @@ public class CreateRoutineLoadInfo {
             RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
             LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
 
-        String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
-        if (encloseStr != null) {
-            if (encloseStr.length() != 1) {
-                throw new AnalysisException("enclose must be single-char");
-            } else {
-                enclose = encloseStr.getBytes()[0];
-            }
-        }
-        String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
-        if (escapeStr != null) {
-            if (escapeStr.length() != 1) {
-                throw new AnalysisException("enclose must be single-char");
-            } else {
-                escape = escapeStr.getBytes()[0];
-            }
-        }
-
         String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
         if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
             ConnectContext tmpCtx = new ConnectContext();
@@ -431,23 +396,9 @@ public class CreateRoutineLoadInfo {
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
 timezone));
 
-        format = jobProperties.get(FORMAT);
-        if (format != null) {
-            if (format.equalsIgnoreCase("csv")) {
-                format = ""; // if it's not json, then it's mean csv and set 
empty
-            } else if (format.equalsIgnoreCase("json")) {
-                format = "json";
-                jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
-                jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
-                stripOuterArray = 
Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
-                numAsString = 
Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
-                fuzzyParse = 
Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
-            } else {
-                throw new UserException("Format type is invalid. format=`" + 
format + "`");
-            }
-        } else {
-            format = "csv"; // default csv
-        }
+        String format = 
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+        fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(format);
+        fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
     }
 
     private void checkDataSourceProperties() throws UserException {
@@ -461,13 +412,11 @@ public class CreateRoutineLoadInfo {
      */
     public CreateRoutineLoadStmt translateToLegacyStmt(ConnectContext ctx) {
         return new CreateRoutineLoadStmt(labelNameInfo.transferToLabelName(), 
dbName, name, tableName, null,
-                ctx.getStatementContext().getOriginStatement(), 
ctx.getCurrentUserIdentity(),
-                jobProperties, typeName, routineLoadDesc,
-                desiredConcurrentNum, maxErrorNum, maxFilterRatio, 
maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes,
-                execMemLimit, sendBatchParallelism, timezone, format, 
jsonPaths, jsonRoot, enclose, escape,
-                workloadGroupName,
-                loadToSingleTablet, strictMode, isPartialUpdate, 
stripOuterArray, numAsString, fuzzyParse,
-                dataSourceProperties
+            ctx.getStatementContext().getOriginStatement(), 
ctx.getCurrentUserIdentity(),
+            jobProperties, typeName, routineLoadDesc,
+            desiredConcurrentNum, maxErrorNum, maxFilterRatio, 
maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes,
+            execMemLimit, sendBatchParallelism, timezone, workloadGroupName, 
loadToSingleTablet,
+            strictMode, isPartialUpdate, dataSourceProperties, 
fileFormatProperties
         );
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
index a496378b5e5..4b2550cfa52 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -33,7 +33,7 @@ public class CsvFileFormatPropertiesTest {
 
     @Before
     public void setUp() {
-        csvFileFormatProperties = new CsvFileFormatProperties();
+        csvFileFormatProperties = new CsvFileFormatProperties("csv");
     }
 
     @Test
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 5603b9dd25a..82f0367772c 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -258,7 +258,7 @@ suite("test_routine_load","p0") {
                         continue;
                     }
                     log.info("reason of state changed: 
${res[0][17].toString()}".toString())
-                    assertEquals(res[0][8].toString(), "RUNNING")
+                    assertEquals("RUNNING", res[0][8].toString())
                     break;
                 }
 
@@ -1301,7 +1301,7 @@ suite("test_routine_load","p0") {
                     sql "sync"
                 }catch (Exception e) {
                     log.info("create routine load failed: ${e.getMessage()}")
-                    assertEquals(e.getMessage(), "errCode = 2, detailMessage = 
Format type is invalid. format=`test`")
+                    assertEquals(e.getMessage(), "errCode = 2, detailMessage = 
format:test is not supported.")
                 }
                 i++
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to