This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 8b5967275af branch-3.1: [feat](file-foramt) unify file format
properties for tvf and outfile #50225 #50463 #50471 (#52101)
8b5967275af is described below
commit 8b5967275af55939a72e36caef69b6b309bccfcd
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Jun 23 09:57:35 2025 +0800
branch-3.1: [feat](file-foramt) unify file format properties for tvf and
outfile #50225 #50463 #50471 (#52101)
bp #50225 #50463 #50471
---------
Co-authored-by: Tiewei Fang <[email protected]>
---
.../org/apache/doris/analysis/OutFileClause.java | 292 ++++++---------------
.../java/org/apache/doris/common/util/Util.java | 6 +-
.../fileformat/AvroFileFormatProperties.java | 49 ++++
.../fileformat/CsvFileFormatProperties.java | 192 ++++++++++++++
.../property/fileformat/FileFormatProperties.java | 124 +++++++++
.../fileformat/JsonFileFormatProperties.java | 117 +++++++++
.../fileformat/OrcFileFormatProperties.java | 83 ++++++
.../fileformat/ParquetFileFormatProperties.java | 129 +++++++++
.../fileformat/WalFileFormatProperties.java | 49 ++++
.../org/apache/doris/planner/ResultFileSink.java | 11 +-
.../ExternalFileTableValuedFunction.java | 159 ++---------
.../HttpStreamTableValuedFunction.java | 6 +-
.../fileformat/AvroFileFormatPropertiesTest.java | 42 +++
.../fileformat/CsvFileFormatPropertiesTest.java | 224 ++++++++++++++++
.../fileformat/FileFormatPropertiesTest.java | 34 +++
.../fileformat/JsonFileFormatPropertiesTest.java | 199 ++++++++++++++
.../fileformat/OrcFileFormatPropertiesTest.java | 97 +++++++
.../ParquetFileFormatPropertiesTest.java | 139 ++++++++++
.../fileformat/WalFileFormatPropertiesTest.java | 41 +++
19 files changed, 1642 insertions(+), 351 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index adf601da1ea..744442955c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -30,19 +30,16 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TParquetCompressionType;
import org.apache.doris.thrift.TParquetDataType;
import org.apache.doris.thrift.TParquetRepetitionType;
import org.apache.doris.thrift.TParquetSchema;
-import org.apache.doris.thrift.TParquetVersion;
import org.apache.doris.thrift.TResultFileSinkOptions;
import com.google.common.base.Preconditions;
@@ -61,7 +58,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
// For syntax select * from tbl INTO OUTFILE xxxx
public class OutFileClause {
@@ -71,9 +67,6 @@ public class OutFileClause {
public static final List<Type> RESULT_COL_TYPES = Lists.newArrayList();
public static final Map<String, TParquetRepetitionType>
PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP =
Maps.newHashMap();
- public static final Map<String, TParquetCompressionType>
PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
- public static final Map<String, TFileCompressType>
ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
- public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP =
Maps.newHashMap();
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
public static final String FILE_NUMBER = "FileNumber";
public static final String TOTAL_ROWS = "TotalRows";
@@ -110,24 +103,6 @@ public class OutFileClause {
PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array",
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
- PARQUET_COMPRESSION_TYPE_MAP.put("snappy",
TParquetCompressionType.SNAPPY);
- PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
- PARQUET_COMPRESSION_TYPE_MAP.put("brotli",
TParquetCompressionType.BROTLI);
- PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
- PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
- // arrow do not support lzo and bz2 compression type.
- // PARQUET_COMPRESSION_TYPE_MAP.put("lzo",
TParquetCompressionType.LZO);
- // PARQUET_COMPRESSION_TYPE_MAP.put("bz2",
TParquetCompressionType.BZ2);
- PARQUET_COMPRESSION_TYPE_MAP.put("plain",
TParquetCompressionType.UNCOMPRESSED);
-
- ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
- ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
- ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
- ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
-
- PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
- PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
-
ORC_DATA_TYPE.add("bigint");
ORC_DATA_TYPE.add("boolean");
ORC_DATA_TYPE.add("double");
@@ -152,9 +127,7 @@ public class OutFileClause {
public static final String PROP_DELETE_EXISTING_FILES =
"delete_existing_files";
public static final String PROP_FILE_SUFFIX = "file_suffix";
public static final String PROP_WITH_BOM = "with_bom";
- public static final String COMPRESS_TYPE = "compress_type";
- private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String SCHEMA = "schema";
private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 *
1024; // 1GB
@@ -162,13 +135,8 @@ public class OutFileClause {
private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L;
// 2GB
private String filePath;
- private String format;
private Map<String, String> properties;
- // set following members after analyzing
- private String columnSeparator = "\t";
- private String lineDelimiter = "\n";
- private TFileFormatType fileFormatType;
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private boolean deleteExistingFiles = false;
private String fileSuffix = "";
@@ -184,43 +152,41 @@ public class OutFileClause {
private List<Pair<String, String>> orcSchemas = new ArrayList<>();
private boolean isAnalyzed = false;
- private String headerType = "";
- private TParquetCompressionType parquetCompressionType =
TParquetCompressionType.SNAPPY;
- private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
- private static final String PARQUET_DISABLE_DICTIONARY =
"disable_dictionary";
- private boolean parquetDisableDictionary = false;
- private static final String PARQUET_VERSION = "version";
- private static TParquetVersion parquetVersion =
TParquetVersion.PARQUET_1_0;
+ private FileFormatProperties fileFormatProperties;
public OutFileClause(String filePath, String format, Map<String, String>
properties) {
this.filePath = filePath;
- this.format = Strings.isNullOrEmpty(format) ? "csv" :
format.toLowerCase();
this.properties = properties;
this.isAnalyzed = false;
+ if (Strings.isNullOrEmpty(format)) {
+ fileFormatProperties =
FileFormatProperties.createFileFormatProperties("csv");
+ } else {
+ fileFormatProperties =
FileFormatProperties.createFileFormatProperties(format.toLowerCase());
+ }
}
public OutFileClause(OutFileClause other) {
this.filePath = other.filePath;
- this.format = other.format;
+ this.fileFormatProperties = other.fileFormatProperties;
this.properties = other.properties == null ? null :
Maps.newHashMap(other.properties);
this.isAnalyzed = other.isAnalyzed;
}
public String getColumnSeparator() {
- return columnSeparator;
+ return ((CsvFileFormatProperties)
fileFormatProperties).getColumnSeparator();
}
public String getLineDelimiter() {
- return lineDelimiter;
+ return ((CsvFileFormatProperties)
fileFormatProperties).getLineDelimiter();
}
public String getHeaderType() {
- return headerType;
+ return ((CsvFileFormatProperties)
fileFormatProperties).getHeaderType();
}
public TFileFormatType getFileFormatType() {
- return fileFormatType;
+ return fileFormatProperties.getFileFormatType();
}
public long getMaxFileSizeBytes() {
@@ -245,28 +211,6 @@ public class OutFileClause {
}
analyzeFilePath();
- switch (this.format) {
- case "csv":
- fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "parquet":
- fileFormatType = TFileFormatType.FORMAT_PARQUET;
- break;
- case "orc":
- fileFormatType = TFileFormatType.FORMAT_ORC;
- break;
- case "csv_with_names":
- headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
- fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "csv_with_names_and_types":
- headerType =
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
- fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- default:
- throw new AnalysisException("format:" + this.format + " is not
supported.");
- }
-
analyzeProperties();
if (brokerDesc != null && isLocalOutput) {
@@ -559,76 +503,60 @@ public class OutFileClause {
if (properties == null || properties.isEmpty()) {
return;
}
+ // Copy the properties, because we will remove the key from properties.
+ Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ copiedProps.putAll(properties);
- Set<String> processedPropKeys = Sets.newHashSet();
- analyzeBrokerDesc(processedPropKeys);
+ analyzeBrokerDesc(copiedProps);
- if (properties.containsKey(PROP_COLUMN_SEPARATOR)) {
- if (!Util.isCsvFormat(fileFormatType)) {
- throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only
for CSV format");
- }
- columnSeparator =
Separator.convertSeparator(properties.get(PROP_COLUMN_SEPARATOR));
- processedPropKeys.add(PROP_COLUMN_SEPARATOR);
- }
-
- if (properties.containsKey(PROP_LINE_DELIMITER)) {
- if (!Util.isCsvFormat(fileFormatType)) {
- throw new AnalysisException(PROP_LINE_DELIMITER + " is only
for CSV format");
- }
- lineDelimiter =
Separator.convertSeparator(properties.get(PROP_LINE_DELIMITER));
- processedPropKeys.add(PROP_LINE_DELIMITER);
- }
+ fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
- if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
- maxFileSizeBytes =
ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE));
+ if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
+ maxFileSizeBytes =
ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes <
MIN_FILE_SIZE_BYTES) {
throw new AnalysisException("max file size should between 5MB
and 2GB. Given: " + maxFileSizeBytes);
}
- processedPropKeys.add(PROP_MAX_FILE_SIZE);
+ copiedProps.remove(PROP_MAX_FILE_SIZE);
}
- if (properties.containsKey(PROP_DELETE_EXISTING_FILES)) {
- deleteExistingFiles =
Boolean.parseBoolean(properties.get(PROP_DELETE_EXISTING_FILES))
+ if (copiedProps.containsKey(PROP_DELETE_EXISTING_FILES)) {
+ deleteExistingFiles =
Boolean.parseBoolean(copiedProps.get(PROP_DELETE_EXISTING_FILES))
& Config.enable_delete_existing_files;
- processedPropKeys.add(PROP_DELETE_EXISTING_FILES);
+ copiedProps.remove(PROP_DELETE_EXISTING_FILES);
}
- if (properties.containsKey(PROP_FILE_SUFFIX)) {
- fileSuffix = properties.get(PROP_FILE_SUFFIX);
- processedPropKeys.add(PROP_FILE_SUFFIX);
+ if (copiedProps.containsKey(PROP_FILE_SUFFIX)) {
+ fileSuffix = copiedProps.get(PROP_FILE_SUFFIX);
+ copiedProps.remove(PROP_FILE_SUFFIX);
}
- if (properties.containsKey(PROP_WITH_BOM)) {
- withBom =
Boolean.valueOf(properties.get(PROP_WITH_BOM)).booleanValue();
- processedPropKeys.add(PROP_WITH_BOM);
+ if (copiedProps.containsKey(PROP_WITH_BOM)) {
+ withBom =
Boolean.valueOf(copiedProps.get(PROP_WITH_BOM)).booleanValue();
+ copiedProps.remove(PROP_WITH_BOM);
}
- if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
- successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
+ if (copiedProps.containsKey(PROP_SUCCESS_FILE_NAME)) {
+ successFileName = copiedProps.get(PROP_SUCCESS_FILE_NAME);
FeNameFormat.checkOutfileSuccessFileName("file name",
successFileName);
- processedPropKeys.add(PROP_SUCCESS_FILE_NAME);
+ copiedProps.remove(PROP_SUCCESS_FILE_NAME);
}
// For Azure compatibility, this is temporarily added to the map
without further processing.
// The validity of each provider's value will be checked later in
S3Properties' check.
- if (properties.containsKey(S3Properties.PROVIDER)) {
- processedPropKeys.add(S3Properties.PROVIDER);
+ if (copiedProps.containsKey(S3Properties.PROVIDER)) {
+ copiedProps.remove(S3Properties.PROVIDER);
}
- if (this.fileFormatType == TFileFormatType.FORMAT_PARQUET) {
- getParquetProperties(processedPropKeys);
+ if (fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_PARQUET) {
+ getParquetProperties(copiedProps);
}
- if (this.fileFormatType == TFileFormatType.FORMAT_ORC) {
- getOrcProperties(processedPropKeys);
+ if (fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_ORC) {
+ getOrcProperties(copiedProps);
}
- if (processedPropKeys.size() != properties.size()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} vs {}", processedPropKeys, properties);
- }
- throw new AnalysisException("Unknown properties: " +
properties.keySet().stream()
- .filter(k ->
!processedPropKeys.contains(k)).collect(Collectors.toList()));
+ if (!copiedProps.isEmpty()) {
+ throw new AnalysisException("Unknown properties: " +
copiedProps.keySet());
}
}
@@ -637,11 +565,11 @@ public class OutFileClause {
* 1. broker: with broker name
* 2. s3: with s3 pattern path, without broker name
*/
- private void analyzeBrokerDesc(Set<String> processedPropKeys) throws
UserException {
- String brokerName = properties.get(PROP_BROKER_NAME);
+ private void analyzeBrokerDesc(Map<String, String> copiedProps) throws
UserException {
+ String brokerName = copiedProps.get(PROP_BROKER_NAME);
StorageBackend.StorageType storageType;
- if (properties.containsKey(PROP_BROKER_NAME)) {
- processedPropKeys.add(PROP_BROKER_NAME);
+ if (copiedProps.containsKey(PROP_BROKER_NAME)) {
+ copiedProps.remove(PROP_BROKER_NAME);
storageType = StorageBackend.StorageType.BROKER;
} else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) {
brokerName = StorageBackend.StorageType.S3.name();
@@ -654,29 +582,32 @@ public class OutFileClause {
}
Map<String, String> brokerProps = Maps.newHashMap();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
+ Iterator<Map.Entry<String, String>> iterator =
copiedProps.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, String> entry = iterator.next();
if (entry.getKey().startsWith(BROKER_PROP_PREFIX) &&
!entry.getKey().equals(PROP_BROKER_NAME)) {
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()),
entry.getValue());
- processedPropKeys.add(entry.getKey());
+ iterator.remove();
} else if
(entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX)
||
entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
+ iterator.remove();
} else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
&& storageType == StorageBackend.StorageType.HDFS) {
brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
+ iterator.remove();
} else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX)
|| entry.getKey().startsWith(HADOOP_PROP_PREFIX))
&& storageType == StorageBackend.StorageType.HDFS) {
brokerProps.put(entry.getKey(), entry.getValue());
- processedPropKeys.add(entry.getKey());
+ iterator.remove();
}
}
+
if (storageType == StorageBackend.StorageType.S3) {
- if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
- brokerProps.put(PropertyConverter.USE_PATH_STYLE,
properties.get(PropertyConverter.USE_PATH_STYLE));
- processedPropKeys.add(PropertyConverter.USE_PATH_STYLE);
+ if (copiedProps.containsKey(PropertyConverter.USE_PATH_STYLE)) {
+ brokerProps.put(PropertyConverter.USE_PATH_STYLE,
copiedProps.get(PropertyConverter.USE_PATH_STYLE));
+ copiedProps.remove(PropertyConverter.USE_PATH_STYLE);
}
S3Properties.requiredS3Properties(brokerProps);
} else if (storageType == StorageBackend.StorageType.HDFS) {
@@ -694,14 +625,6 @@ public class OutFileClause {
return fullPath.replace(filePath, "");
}
- void setParquetVersion(String propertyValue) {
- if (PARQUET_VERSION_MAP.containsKey(propertyValue)) {
- this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue);
- } else {
- LOG.debug("not set parquet version type or is invalid, set default
to PARQUET_1.0 version.");
- }
- }
-
/**
* example:
* SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/"
@@ -713,36 +636,10 @@ public class OutFileClause {
* prefix with 'parquet.' defines the properties of parquet file,
* currently only supports: compression, disable_dictionary, version
*/
- private void getParquetProperties(Set<String> processedPropKeys) throws
AnalysisException {
- // save compress type
- if (properties.containsKey(COMPRESS_TYPE)) {
- if
(PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
{
- this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
- properties.get(COMPRESS_TYPE).toLowerCase());
- processedPropKeys.add(COMPRESS_TYPE);
- } else {
- throw new AnalysisException("parquet compression type [" +
properties.get(COMPRESS_TYPE)
- + "] is invalid, please choose one among SNAPPY, GZIP,
BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
- }
- }
-
- // save all parquet prefix property
- Iterator<Map.Entry<String, String>> iter =
properties.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, String> entry = iter.next();
- if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
- processedPropKeys.add(entry.getKey());
- if
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY))
{
- this.parquetDisableDictionary =
Boolean.valueOf(entry.getValue());
- } else if
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION))
{
- setParquetVersion(entry.getValue());
- }
- }
- }
-
+ private void getParquetProperties(Map<String, String> copiedProps) throws
AnalysisException {
// check schema. if schema is not set, Doris will gen schema by select
items
// Note: These codes are useless and outdated.
- String schema = properties.get(SCHEMA);
+ String schema = copiedProps.get(SCHEMA);
if (schema == null) {
return;
}
@@ -753,43 +650,31 @@ public class OutFileClause {
schema = schema.toLowerCase();
String[] schemas = schema.split(";");
for (String item : schemas) {
- String[] properties = item.split(",");
- if (properties.length != 3) {
+ String[] fields = item.split(",");
+ if (fields.length != 3) {
throw new AnalysisException("must only contains repetition
type/column type/column name");
}
- if (!PARQUET_REPETITION_TYPE_MAP.containsKey(properties[0])) {
+ if (!PARQUET_REPETITION_TYPE_MAP.containsKey(fields[0])) {
throw new AnalysisException("unknown repetition type");
}
- if (!properties[0].equalsIgnoreCase("required")) {
+ if (!fields[0].equalsIgnoreCase("required")) {
throw new AnalysisException("currently only support required
type");
}
- if (!PARQUET_DATA_TYPE_MAP.containsKey(properties[1])) {
- throw new AnalysisException("data type is not supported:" +
properties[1]);
+ if (!PARQUET_DATA_TYPE_MAP.containsKey(fields[1])) {
+ throw new AnalysisException("data type is not supported:" +
fields[1]);
}
TParquetSchema parquetSchema = new TParquetSchema();
- parquetSchema.schema_repetition_type =
PARQUET_REPETITION_TYPE_MAP.get(properties[0]);
- parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get(properties[1]);
- parquetSchema.schema_column_name = properties[2];
+ parquetSchema.schema_repetition_type =
PARQUET_REPETITION_TYPE_MAP.get(fields[0]);
+ parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get(fields[1]);
+ parquetSchema.schema_column_name = fields[2];
parquetSchemas.add(parquetSchema);
}
- processedPropKeys.add(SCHEMA);
+ copiedProps.remove(SCHEMA);
}
- private void getOrcProperties(Set<String> processedPropKeys) throws
AnalysisException {
- // get compression type
- // save compress type
- if (properties.containsKey(COMPRESS_TYPE)) {
- if
(ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
{
- this.orcCompressionType =
ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase());
- processedPropKeys.add(COMPRESS_TYPE);
- } else {
- throw new AnalysisException("orc compression type [" +
properties.get(COMPRESS_TYPE) + "] is invalid,"
- + " please choose one among ZLIB, SNAPPY, ZSTD or
PLAIN");
- }
- }
-
+ private void getOrcProperties(Map<String, String> copiedProps) throws
AnalysisException {
// check schema. if schema is not set, Doris will gen schema by select
items
- String schema = properties.get(SCHEMA);
+ String schema = copiedProps.get(SCHEMA);
if (schema == null) {
return;
}
@@ -800,15 +685,15 @@ public class OutFileClause {
schema = schema.toLowerCase();
String[] schemas = schema.split(";");
for (String item : schemas) {
- String[] properties = item.split(",");
- if (properties.length != 2) {
+ String[] fields = item.split(",");
+ if (fields.length != 2) {
throw new AnalysisException("must only contains type and
column name");
}
- if (!ORC_DATA_TYPE.contains(properties[1]) &&
!properties[1].startsWith("decimal")) {
- throw new AnalysisException("data type is not supported:" +
properties[1]);
- } else if (!ORC_DATA_TYPE.contains(properties[1]) &&
properties[1].startsWith("decimal")) {
+ if (!ORC_DATA_TYPE.contains(fields[1]) &&
!fields[1].startsWith("decimal")) {
+ throw new AnalysisException("data type is not supported:" +
fields[1]);
+ } else if (!ORC_DATA_TYPE.contains(fields[1]) &&
fields[1].startsWith("decimal")) {
String errorMsg = "Format of decimal type must be
decimal(%d,%d)";
- String precisionAndScale = properties[1].substring(0,
"decimal".length()).trim();
+ String precisionAndScale = fields[1].substring(0,
"decimal".length()).trim();
if (!precisionAndScale.startsWith("(") ||
!precisionAndScale.endsWith(")")) {
throw new AnalysisException(errorMsg);
}
@@ -817,17 +702,17 @@ public class OutFileClause {
throw new AnalysisException(errorMsg);
}
}
- orcSchemas.add(Pair.of(properties[0], properties[1]));
+ orcSchemas.add(Pair.of(fields[0], fields[1]));
}
- processedPropKeys.add(SCHEMA);
+ copiedProps.remove(SCHEMA);
}
private boolean isParquetFormat() {
- return fileFormatType == TFileFormatType.FORMAT_PARQUET;
+ return fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_PARQUET;
}
private boolean isOrcFormat() {
- return fileFormatType == TFileFormatType.FORMAT_ORC;
+ return fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_ORC;
}
public String getFilePath() {
@@ -845,7 +730,8 @@ public class OutFileClause {
public String toSql() {
StringBuilder sb = new StringBuilder();
- sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS
").append(format);
+ sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
+ .append(fileFormatProperties.getFileFormatType());
if (properties != null && !properties.isEmpty()) {
sb.append(" PROPERTIES(");
sb.append(new PrintableMap<>(properties, " = ", true, false));
@@ -864,11 +750,10 @@ public class OutFileClause {
}
public TResultFileSinkOptions toSinkOptions() {
- TResultFileSinkOptions sinkOptions = new
TResultFileSinkOptions(filePath, fileFormatType);
- if (Util.isCsvFormat(fileFormatType)) {
- sinkOptions.setColumnSeparator(columnSeparator);
- sinkOptions.setLineDelimiter(lineDelimiter);
- }
+ TResultFileSinkOptions sinkOptions = new
TResultFileSinkOptions(filePath,
+ fileFormatProperties.getFileFormatType());
+ fileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+
sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
sinkOptions.setFileSuffix(fileSuffix);
@@ -883,15 +768,10 @@ public class OutFileClause {
sinkOptions.setSuccessFileName(successFileName);
}
if (isParquetFormat()) {
- sinkOptions.setParquetCompressionType(parquetCompressionType);
- sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
- sinkOptions.setParquetVersion(parquetVersion);
sinkOptions.setParquetSchemas(parquetSchemas);
}
if (isOrcFormat()) {
sinkOptions.setOrcSchema(serializeOrcSchema());
- sinkOptions.setOrcCompressionType(orcCompressionType);
- sinkOptions.setOrcWriterVersion(1);
}
return sinkOptions;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index c394d9abc28..99e226f87ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -614,7 +614,11 @@ public class Util {
return TFileCompressType.UNKNOWN;
}
final String upperCaseType = compressType.toUpperCase();
- return TFileCompressType.valueOf(upperCaseType);
+ try {
+ return TFileCompressType.valueOf(upperCaseType);
+ } catch (IllegalArgumentException e) {
+ return TFileCompressType.UNKNOWN;
+ }
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
new file mode 100644
index 00000000000..6d2b799ea00
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class AvroFileFormatProperties extends FileFormatProperties {
+ public AvroFileFormatProperties() {
+ super(TFileFormatType.FORMAT_AVRO);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ return fileAttributes;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
new file mode 100644
index 00000000000..0efea98a5c3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.analysis.Separator;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+import org.apache.doris.thrift.TTextSerdeType;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class CsvFileFormatProperties extends FileFormatProperties {
+ public static final Logger LOG = LogManager.getLogger(
+
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);
+
+ public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
+ public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String PROP_COLUMN_SEPARATOR = "column_separator";
+ public static final String PROP_LINE_DELIMITER = "line_delimiter";
+
+ public static final String PROP_SKIP_LINES = "skip_lines";
+ public static final String PROP_CSV_SCHEMA = "csv_schema";
+ public static final String PROP_COMPRESS_TYPE = "compress_type";
+ public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+
+ public static final String PROP_ENCLOSE = "enclose";
+
+ private String headerType = "";
+ private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
+ private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
+ private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private boolean trimDoubleQuotes;
+ private int skipLines;
+ private byte enclose;
+
+ // used by tvf
+ // User specified csv columns, it will override columns got from file
+ private final List<Column> csvSchema = Lists.newArrayList();
+
+ String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
+
+ public CsvFileFormatProperties() {
+ super(TFileFormatType.FORMAT_CSV_PLAIN);
+ }
+
+ public CsvFileFormatProperties(String defaultColumnSeparator,
TTextSerdeType textSerdeType) {
+ super(TFileFormatType.FORMAT_CSV_PLAIN);
+ this.defaultColumnSeparator = defaultColumnSeparator;
+ this.textSerdeType = textSerdeType;
+ }
+
+ public CsvFileFormatProperties(String headerType) {
+ super(TFileFormatType.FORMAT_CSV_PLAIN);
+ this.headerType = headerType;
+ }
+
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ try {
+ // analyze properties specified by user
+ columnSeparator = getOrDefault(formatProperties,
PROP_COLUMN_SEPARATOR,
+ defaultColumnSeparator, isRemoveOriginProperty);
+ if (Strings.isNullOrEmpty(columnSeparator)) {
+ throw new AnalysisException("column_separator can not be
empty.");
+ }
+ columnSeparator = Separator.convertSeparator(columnSeparator);
+
+ lineDelimiter = getOrDefault(formatProperties, PROP_LINE_DELIMITER,
+ DEFAULT_LINE_DELIMITER, isRemoveOriginProperty);
+ if (Strings.isNullOrEmpty(lineDelimiter)) {
+ throw new AnalysisException("line_delimiter can not be
empty.");
+ }
+ lineDelimiter = Separator.convertSeparator(lineDelimiter);
+
+ String enclosedString = getOrDefault(formatProperties,
PROP_ENCLOSE,
+ "", isRemoveOriginProperty);
+ if (!Strings.isNullOrEmpty(enclosedString)) {
+ if (enclosedString.length() > 1) {
+ throw new AnalysisException("enclose should not be longer
than one byte.");
+ }
+ enclose = (byte) enclosedString.charAt(0);
+ if (enclose == 0) {
+ throw new AnalysisException("enclose should not be byte
[0].");
+ }
+ }
+
+ trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
+ PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
+ .booleanValue();
+ skipLines = Integer.valueOf(getOrDefault(formatProperties,
+ PROP_SKIP_LINES, "0", isRemoveOriginProperty)).intValue();
+ if (skipLines < 0) {
+ throw new AnalysisException("skipLines should not be less than
0.");
+ }
+
+ String compressTypeStr = getOrDefault(formatProperties,
+ PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
+ compressionType = Util.getFileCompressType(compressTypeStr);
+
+ } catch (org.apache.doris.common.AnalysisException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ sinkOptions.setColumnSeparator(columnSeparator);
+ sinkOptions.setLineDelimiter(lineDelimiter);
+ }
+
+ // The method `analyzeFileFormatProperties` must have been called once
before this method
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
+ fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
+ if (this.enclose != 0) {
+ fileTextScanRangeParams.setEnclose(this.enclose);
+ }
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ fileAttributes.setHeaderType(headerType);
+ fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
+ fileAttributes.setSkipLines(skipLines);
+ fileAttributes.setEnableTextValidateUtf8(
+
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
+ return fileAttributes;
+ }
+
+ public String getHeaderType() {
+ return headerType;
+ }
+
+ public TTextSerdeType getTextSerdeType() {
+ return textSerdeType;
+ }
+
+ public String getColumnSeparator() {
+ return columnSeparator;
+ }
+
+ public String getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public boolean isTrimDoubleQuotes() {
+ return trimDoubleQuotes;
+ }
+
+ public int getSkipLines() {
+ return skipLines;
+ }
+
+ public byte getEnclose() {
+ return enclose;
+ }
+
+ public List<Column> getCsvSchema() {
+ return csvSchema;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
new file mode 100644
index 00000000000..bd0ecc214c6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+import org.apache.doris.thrift.TTextSerdeType;
+
+import java.util.Map;
+
+public abstract class FileFormatProperties {
+ public static final String PROP_FORMAT = "format";
+ public static final String FORMAT_PARQUET = "parquet";
+ public static final String FORMAT_CSV = "csv";
+ public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
+ public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES =
"csv_with_names_and_types";
+ public static final String FORMAT_HIVE_TEXT = "hive_text";
+ public static final String FORMAT_ORC = "orc";
+ public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_AVRO = "avro";
+ public static final String FORMAT_WAL = "wal";
+ public static final String FORMAT_ARROW = "arrow";
+ public static final String PROP_COMPRESS_TYPE = "compress_type";
+
+ protected TFileFormatType fileFormatType;
+
+ protected TFileCompressType compressionType;
+
+ public FileFormatProperties(TFileFormatType fileFormatType) {
+ this.fileFormatType = fileFormatType;
+ }
+
+ /**
+ * Analyze user properties
+ * @param formatProperties properties specified by user
+ * @param isRemoveOriginProperty if this param is set to true, then this
method would remove the origin property
+ * @throws AnalysisException
+ */
+ public abstract void analyzeFileFormatProperties(
+ Map<String, String> formatProperties, boolean
isRemoveOriginProperty)
+ throws AnalysisException;
+
+ /**
+ * generate TResultFileSinkOptions according to the properties of
specified file format
+ * You must call method `analyzeFileFormatProperties` once before calling
method `toTResultFileSinkOptions`
+ */
+ public abstract void fullTResultFileSinkOptions(TResultFileSinkOptions
sinkOptions);
+
+ /**
+ * generate TFileAttributes according to the properties of specified file
format
+ * You must call method `analyzeFileFormatProperties` once before calling
method `toTFileAttributes`
+ */
+ public abstract TFileAttributes toTFileAttributes();
+
+ public static FileFormatProperties createFileFormatProperties(String
formatString) {
+ switch (formatString) {
+ case FORMAT_CSV:
+ return new CsvFileFormatProperties();
+ case FORMAT_HIVE_TEXT:
+ return new
CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
+ TTextSerdeType.HIVE_TEXT_SERDE);
+ case FORMAT_CSV_WITH_NAMES:
+ return new CsvFileFormatProperties(
+ FORMAT_CSV_WITH_NAMES);
+ case FORMAT_CSV_WITH_NAMES_AND_TYPES:
+ return new CsvFileFormatProperties(
+ FORMAT_CSV_WITH_NAMES_AND_TYPES);
+ case FORMAT_PARQUET:
+ return new ParquetFileFormatProperties();
+ case FORMAT_ORC:
+ return new OrcFileFormatProperties();
+ case FORMAT_JSON:
+ return new JsonFileFormatProperties();
+ case FORMAT_AVRO:
+ return new AvroFileFormatProperties();
+ case FORMAT_WAL:
+ return new WalFileFormatProperties();
+ default:
+ throw new AnalysisException("format:" + formatString + " is
not supported.");
+ }
+ }
+
+ public static FileFormatProperties createFileFormatProperties(Map<String,
String> formatProperties)
+ throws AnalysisException {
+ String formatString = formatProperties.getOrDefault(PROP_FORMAT, "")
+ .toLowerCase();
+ return createFileFormatProperties(formatString);
+ }
+
+ protected String getOrDefault(Map<String, String> props, String key,
String defaultValue,
+ boolean isRemove) {
+ String value = props.getOrDefault(key, defaultValue);
+ if (isRemove) {
+ props.remove(key);
+ }
+ return value;
+ }
+
+ public TFileFormatType getFileFormatType() {
+ return fileFormatType;
+ }
+
+ public TFileCompressType getCompressionType() {
+ return compressionType;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
new file mode 100644
index 00000000000..3431d366f8b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class JsonFileFormatProperties extends FileFormatProperties {
+ public static final String PROP_JSON_ROOT = "json_root";
+ public static final String PROP_JSON_PATHS = "jsonpaths";
+ public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array";
+ public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
+ public static final String PROP_NUM_AS_STRING = "num_as_string";
+ public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
+
+ // from ExternalFileTableValuedFunction:
+ private String jsonRoot = "";
+ private String jsonPaths = "";
+ private boolean stripOuterArray;
+ private boolean readJsonByLine;
+ private boolean numAsString;
+ private boolean fuzzyParse;
+
+
+ public JsonFileFormatProperties() {
+ super(TFileFormatType.FORMAT_JSON);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ jsonRoot = getOrDefault(formatProperties, PROP_JSON_ROOT,
+ "", isRemoveOriginProperty);
+ jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS,
+ "", isRemoveOriginProperty);
+ readJsonByLine = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE,
+ "", isRemoveOriginProperty)).booleanValue();
+ stripOuterArray = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY,
+ "", isRemoveOriginProperty)).booleanValue();
+ numAsString = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_NUM_AS_STRING,
+ "", isRemoveOriginProperty)).booleanValue();
+ fuzzyParse = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_FUZZY_PARSE,
+ "", isRemoveOriginProperty)).booleanValue();
+
+ String compressTypeStr = getOrDefault(formatProperties,
PROP_COMPRESS_TYPE,
+ "UNKNOWN", isRemoveOriginProperty);
+ compressionType = Util.getFileCompressType(compressTypeStr);
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+
fileTextScanRangeParams.setLineDelimiter(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER);
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ fileAttributes.setJsonRoot(jsonRoot);
+ fileAttributes.setJsonpaths(jsonPaths);
+ fileAttributes.setReadJsonByLine(readJsonByLine);
+ fileAttributes.setStripOuterArray(stripOuterArray);
+ fileAttributes.setNumAsString(numAsString);
+ fileAttributes.setFuzzyParse(fuzzyParse);
+ return fileAttributes;
+ }
+
+ public String getJsonRoot() {
+ return jsonRoot;
+ }
+
+ public String getJsonPaths() {
+ return jsonPaths;
+ }
+
+ public boolean isStripOuterArray() {
+ return stripOuterArray;
+ }
+
+ public boolean isReadJsonByLine() {
+ return readJsonByLine;
+ }
+
+ public boolean isNumAsString() {
+ return numAsString;
+ }
+
+ public boolean isFuzzyParse() {
+ return fuzzyParse;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
new file mode 100644
index 00000000000..412c3d237e8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class OrcFileFormatProperties extends FileFormatProperties {
+ public static final Map<String, TFileCompressType>
ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
+
+ static {
+ ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
+ ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
+ ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
+ ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
+ }
+
+ private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
+
+ public OrcFileFormatProperties() {
+ super(TFileFormatType.FORMAT_ORC);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ // get compression type
+ // save compress type
+ if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) {
+ if (ORC_COMPRESSION_TYPE_MAP.containsKey(
+ formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase())) {
+ this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get(
+
formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase());
+ formatProperties.remove(PROP_COMPRESS_TYPE);
+ } else {
+ throw new AnalysisException("orc compression type ["
+ + formatProperties.get(PROP_COMPRESS_TYPE) + "] is
invalid,"
+ + " please choose one among ZLIB, SNAPPY, ZSTD or
PLAIN");
+ }
+ }
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ sinkOptions.setOrcCompressionType(orcCompressionType);
+ sinkOptions.setOrcWriterVersion(1);
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ return fileAttributes;
+ }
+
+ public TFileCompressType getOrcCompressionType() {
+ return orcCompressionType;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
new file mode 100644
index 00000000000..8063b25964a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TParquetCompressionType;
+import org.apache.doris.thrift.TParquetVersion;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class ParquetFileFormatProperties extends FileFormatProperties {
+ public static final String PARQUET_DISABLE_DICTIONARY =
"disable_dictionary";
+ public static final String PARQUET_VERSION = "version";
+ public static final String PARQUET_PROP_PREFIX = "parquet.";
+
+ public static final Logger LOG =
LogManager.getLogger(ParquetFileFormatProperties.class);
+ public static final Map<String, TParquetCompressionType>
PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
+ public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP =
Maps.newHashMap();
+
+ static {
+ PARQUET_COMPRESSION_TYPE_MAP.put("snappy",
TParquetCompressionType.SNAPPY);
+ PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
+ PARQUET_COMPRESSION_TYPE_MAP.put("brotli",
TParquetCompressionType.BROTLI);
+ PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
+ PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
+ // arrow do not support lzo and bz2 compression type.
+ // PARQUET_COMPRESSION_TYPE_MAP.put("lzo",
TParquetCompressionType.LZO);
+ // PARQUET_COMPRESSION_TYPE_MAP.put("bz2",
TParquetCompressionType.BZ2);
+ PARQUET_COMPRESSION_TYPE_MAP.put("plain",
TParquetCompressionType.UNCOMPRESSED);
+
+ PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
+ PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
+ }
+
+ private TParquetCompressionType parquetCompressionType =
TParquetCompressionType.SNAPPY;
+ private boolean parquetDisableDictionary = false;
+ private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
+
+ public ParquetFileFormatProperties() {
+ super(TFileFormatType.FORMAT_PARQUET);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ // save compress type
+ if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) {
+ if
(PARQUET_COMPRESSION_TYPE_MAP.containsKey(formatProperties.get(PROP_COMPRESS_TYPE)
+ .toLowerCase())) {
+ this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
+
formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase());
+ formatProperties.remove(PROP_COMPRESS_TYPE);
+ } else {
+ throw new AnalysisException("parquet compression type ["
+ + formatProperties.get(PROP_COMPRESS_TYPE)
+ + "] is invalid, please choose one among SNAPPY, GZIP,
BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
+ }
+ }
+
+ // save all parquet prefix property
+ Iterator<Entry<String, String>> iter =
formatProperties.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+ if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
+ iter.remove();
+ if (entry.getKey().substring(PARQUET_PROP_PREFIX.length())
+ .equals(PARQUET_DISABLE_DICTIONARY)) {
+ this.parquetDisableDictionary =
Boolean.valueOf(entry.getValue());
+ } else if
(entry.getKey().substring(PARQUET_PROP_PREFIX.length())
+ .equals(PARQUET_VERSION)) {
+ if (PARQUET_VERSION_MAP.containsKey(entry.getValue())) {
+ this.parquetVersion =
PARQUET_VERSION_MAP.get(entry.getValue());
+ } else {
+ LOG.debug("not set parquet version type or is invalid,
set default to PARQUET_1.0 version.");
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ sinkOptions.setParquetCompressionType(parquetCompressionType);
+ sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
+ sinkOptions.setParquetVersion(parquetVersion);
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ return fileAttributes;
+ }
+
+ public TParquetCompressionType getParquetCompressionType() {
+ return parquetCompressionType;
+ }
+
+ public boolean isParquetDisableDictionary() {
+ return parquetDisableDictionary;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
new file mode 100644
index 00000000000..0c6b1777cf6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class WalFileFormatProperties extends FileFormatProperties {
+ public WalFileFormatProperties() {
+ super(TFileFormatType.FORMAT_WAL);
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ return fileAttributes;
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties, boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 3dbd5bc2115..2f8f9ae8ab1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.Util;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
@@ -69,11 +70,13 @@ public class ResultFileSink extends DataSink {
public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause,
ArrayList<String> labels) {
this(exchNodeId, outFileClause);
- if
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
- ||
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
{
- header = genNames(labels, outFileClause.getColumnSeparator(),
outFileClause.getLineDelimiter());
+ if (Util.isCsvFormat(outFileClause.getFileFormatType())) {
+ if
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+ ||
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
{
+ header = genNames(labels, outFileClause.getColumnSeparator(),
outFileClause.getLineDelimiter());
+ }
+ headerType = outFileClause.getHeaderType();
}
- headerType = outFileClause.getHeaderType();
}
public String getBrokerName() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 5a15ccd21f5..9ab6d302c3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -18,7 +18,6 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
@@ -41,6 +40,8 @@ import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanNodeId;
@@ -63,15 +64,12 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TTextSerdeType;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -110,25 +108,9 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
protected Map<String, String> locationProperties = Maps.newHashMap();
protected String filePath;
- protected TFileFormatType fileFormatType;
-
protected Optional<String> resourceName = Optional.empty();
- private TFileCompressType compressionType;
- private String headerType = "";
-
- private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
- private String columnSeparator =
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
- private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER;
- private byte enclose = 0;
- private String jsonRoot = "";
- private String jsonPaths = "";
- private boolean stripOuterArray;
- private boolean readJsonByLine;
- private boolean numAsString;
- private boolean fuzzyParse;
- private boolean trimDoubleQuotes;
- private int skipLines;
+ public FileFormatProperties fileFormatProperties;
private long tableId;
public abstract TFileType getTFileType();
@@ -138,11 +120,11 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
public abstract BrokerDesc getBrokerDesc();
public TFileFormatType getTFileFormatType() {
- return fileFormatType;
+ return fileFormatProperties.getFileFormatType();
}
public TFileCompressType getTFileCompressType() {
- return compressionType;
+ return fileFormatProperties.getCompressionType();
}
public Map<String, String> getLocationProperties() {
@@ -179,94 +161,15 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
copiedProps.putAll(mergedProperties);
- String formatString = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_FORMAT, "").toLowerCase();
- String defaultColumnSeparator =
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
- switch (formatString) {
- case "csv":
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "hive_text":
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- defaultColumnSeparator =
FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
- this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE;
- break;
- case "csv_with_names":
- this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "csv_with_names_and_types":
- this.headerType =
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "parquet":
- this.fileFormatType = TFileFormatType.FORMAT_PARQUET;
- break;
- case "orc":
- this.fileFormatType = TFileFormatType.FORMAT_ORC;
- break;
- case "json":
- this.fileFormatType = TFileFormatType.FORMAT_JSON;
- break;
- case "avro":
- this.fileFormatType = TFileFormatType.FORMAT_AVRO;
- break;
- case "wal":
- this.fileFormatType = TFileFormatType.FORMAT_WAL;
- break;
- default:
- throw new AnalysisException("format:" + formatString + " is
not supported.");
- }
-
tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps,
PROP_TABLE_ID, "-1"));
- columnSeparator = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_COLUMN_SEPARATOR,
- defaultColumnSeparator);
- if (Strings.isNullOrEmpty(columnSeparator)) {
- throw new AnalysisException("column_separator can not be empty.");
- }
- columnSeparator = Separator.convertSeparator(columnSeparator);
-
- lineDelimiter = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_LINE_DELIMITER,
- FileFormatConstants.DEFAULT_LINE_DELIMITER);
- if (Strings.isNullOrEmpty(lineDelimiter)) {
- throw new AnalysisException("line_delimiter can not be empty.");
- }
- lineDelimiter = Separator.convertSeparator(lineDelimiter);
- String enclosedString = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_ENCLOSE, "");
- if (!Strings.isNullOrEmpty(enclosedString)) {
- if (enclosedString.length() > 1) {
- throw new AnalysisException("enclose should not be longer than
one byte.");
- }
- enclose = (byte) enclosedString.charAt(0);
- if (enclose == 0) {
- throw new AnalysisException("enclose should not be byte [0].");
- }
- }
+ String formatString = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_FORMAT, "").toLowerCase();
+ fileFormatProperties =
FileFormatProperties.createFileFormatProperties(formatString);
+ fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
- jsonRoot = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_JSON_ROOT, "");
- jsonPaths = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_JSON_PATHS, "");
- readJsonByLine = Boolean.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue();
- stripOuterArray = Boolean.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue();
- numAsString = Boolean.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue();
- fuzzyParse = Boolean.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue();
- trimDoubleQuotes = Boolean.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue();
- skipLines = Integer.valueOf(
- getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_SKIP_LINES, "0")).intValue();
-
- String compressTypeStr = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN");
- try {
- compressionType = Util.getFileCompressType(compressTypeStr);
- } catch (IllegalArgumentException e) {
- throw new AnalysisException("Compress type : " + compressTypeStr
+ " is not supported.");
- }
- if (FileFormatUtils.isCsv(formatString)) {
+ if (fileFormatProperties instanceof CsvFileFormatProperties) {
FileFormatUtils.parseCsvSchema(csvSchema,
getOrDefaultAndRemove(copiedProps,
- FileFormatConstants.PROP_CSV_SCHEMA, ""));
+ CsvFileFormatProperties.PROP_CSV_SCHEMA, ""));
if (LOG.isDebugEnabled()) {
LOG.debug("get csv schema: {}", csvSchema);
}
@@ -293,29 +196,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
public TFileAttributes getFileAttributes() {
- TFileAttributes fileAttributes = new TFileAttributes();
- TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
- fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
- fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
- if (enclose != 0) {
- fileTextScanRangeParams.setEnclose(enclose);
- }
- fileAttributes.setTextParams(fileTextScanRangeParams);
- if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
- fileAttributes.setHeaderType(this.headerType);
- fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
- fileAttributes.setSkipLines(skipLines);
- fileAttributes.setEnableTextValidateUtf8(
-
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
- } else if (this.fileFormatType == TFileFormatType.FORMAT_JSON) {
- fileAttributes.setJsonRoot(jsonRoot);
- fileAttributes.setJsonpaths(jsonPaths);
- fileAttributes.setReadJsonByLine(readJsonByLine);
- fileAttributes.setStripOuterArray(stripOuterArray);
- fileAttributes.setNumAsString(numAsString);
- fileAttributes.setFuzzyParse(fuzzyParse);
- }
- return fileAttributes;
+ return fileFormatProperties.toTFileAttributes();
}
@Override
@@ -345,7 +226,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
throw new AnalysisException("No Alive backends");
}
- if (this.fileFormatType == TFileFormatType.FORMAT_WAL) {
+ if (fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_WAL) {
List<Column> fileColumns = new ArrayList<>();
Table table =
Env.getCurrentInternalCatalog().getTableByTableId(tableId);
List<Column> tableColumns = table.getBaseSchema(true);
@@ -452,7 +333,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
Pair<Type, Integer> fieldType = getColumnType(typeNodes, start
+ parsedNodes);
PStructField structField =
typeNodes.get(start).getStructFields(i);
fields.add(new StructField(structField.getName(),
fieldType.key(), structField.getComment(),
- structField.getContainsNull()));
+ structField.getContainsNull()));
parsedNodes += fieldType.value();
}
type = new StructType(fields);
@@ -488,9 +369,11 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws
TException {
// set TFileScanRangeParams
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
- fileScanRangeParams.setFormatType(fileFormatType);
+
fileScanRangeParams.setFormatType(fileFormatProperties.getFileFormatType());
fileScanRangeParams.setProperties(locationProperties);
- fileScanRangeParams.setTextSerdeType(textSerdeType);
+ if (fileFormatProperties instanceof CsvFileFormatProperties) {
+ fileScanRangeParams.setTextSerdeType(((CsvFileFormatProperties)
fileFormatProperties).getTextSerdeType());
+ }
fileScanRangeParams.setFileAttributes(getFileAttributes());
ConnectContext ctx = ConnectContext.get();
fileScanRangeParams.setLoadId(ctx.queryId());
@@ -529,7 +412,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
fileRangeDesc.setLoadId(ctx.queryId());
fileRangeDesc.setFileType(getTFileType());
-
fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType,
firstFile.getPath()));
+ fileRangeDesc.setCompressType(Util.getOrInferCompressType(
+ fileFormatProperties.getCompressionType(),
firstFile.getPath()));
fileRangeDesc.setPath(firstFile.getPath());
fileRangeDesc.setStartOffset(0);
fileRangeDesc.setSize(firstFile.getSize());
@@ -547,9 +431,10 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
if (fileStatus.isIsDir() || fileStatus.size == 0) {
return true;
}
- if (Util.isCsvFormat(fileFormatType) || fileFormatType ==
TFileFormatType.FORMAT_JSON) {
+ if (Util.isCsvFormat(fileFormatProperties.getFileFormatType())
+ || fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_JSON) {
int magicNumberBytes = 0;
- switch (compressionType) {
+ switch (fileFormatProperties.getCompressionType()) {
case GZ:
magicNumberBytes = 20;
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
index 5044f045c31..72573a2355f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
@@ -40,9 +40,9 @@ public class HttpStreamTableValuedFunction extends
ExternalFileTableValuedFuncti
// 1. analyze common properties
super.parseCommonProperties(properties);
- if (fileFormatType == TFileFormatType.FORMAT_PARQUET
- || fileFormatType == TFileFormatType.FORMAT_AVRO
- || fileFormatType == TFileFormatType.FORMAT_ORC) {
+ if (fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_PARQUET
+ || fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_AVRO
+ || fileFormatProperties.getFileFormatType() ==
TFileFormatType.FORMAT_ORC) {
throw new AnalysisException("http_stream does not yet support
parquet, avro and orc");
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..a7fc534e0de
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AvroFileFormatPropertiesTest {
+
+ private AvroFileFormatProperties avroFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ avroFileFormatProperties = new AvroFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatProperties() {
+ Map<String, String> properties = new HashMap<>();
+ // Add properties if needed
+ avroFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..a496378b5e5
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileCompressType;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CsvFileFormatPropertiesTest {
+
+ private CsvFileFormatProperties csvFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ csvFileFormatProperties = new CsvFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValid() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ",");
+ properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\n");
+ properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+ Assert.assertEquals(",", csvFileFormatProperties.getColumnSeparator());
+ Assert.assertEquals("\n", csvFileFormatProperties.getLineDelimiter());
+ Assert.assertEquals(1, csvFileFormatProperties.getSkipLines());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidSeparator() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, "");
+
+ Assert.assertThrows(AnalysisException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidLineDelimiter() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "");
+
+ Assert.assertThrows(AnalysisException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidEnclose() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "invalid");
+
+ Assert.assertThrows(AnalysisException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidEnclose() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\"");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals((byte) '"', csvFileFormatProperties.getEnclose());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesSkipLinesNegative() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "-1");
+
+ Assert.assertThrows(AnalysisException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesSkipLinesLargeValue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1000");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(1000, csvFileFormatProperties.getSkipLines());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesTrue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES,
"true");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true,
csvFileFormatProperties.isTrimDoubleQuotes());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesFalse() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES,
"false");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false,
csvFileFormatProperties.isTrimDoubleQuotes());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.UNKNOWN,
csvFileFormatProperties.getCompressionType());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidCompressType() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.GZ,
csvFileFormatProperties.getCompressionType());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesEmptyCsvSchema() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_CSV_SCHEMA, "");
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ }
+
+ @Test
+ public void
testAnalyzeFileFormatPropertiesValidEncloseMultipleCharacters() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\"\"");
+
+ Assert.assertThrows(AnalysisException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidEncloseEmpty() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(0, csvFileFormatProperties.getEnclose());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesSkipLinesAsString() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "abc");
+
+ Assert.assertThrows(NumberFormatException.class, () -> {
+ csvFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ });
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidColumnSeparator() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ";");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(";", csvFileFormatProperties.getColumnSeparator());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesLineDelimiterAsString() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "abc");
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidLineDelimiter() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\r\n");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("\r\n",
csvFileFormatProperties.getLineDelimiter());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidTrimDoubleQuotes() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES,
"true");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true,
csvFileFormatProperties.isTrimDoubleQuotes());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidTrimDoubleQuotes() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES,
"invalid");
+
+ csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false,
csvFileFormatProperties.isTrimDoubleQuotes());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
new file mode 100644
index 00000000000..74d8d0db2ad
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class FileFormatPropertiesTest {
+
+ @Test
+ public void testCreateFileFormatPropertiesInvalidFormat() {
+ Assert.assertThrows(AnalysisException.class, () -> {
+ FileFormatProperties.createFileFormatProperties("invalid_format");
+ });
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..f614d322386
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonFileFormatPropertiesTest {
+
+ private JsonFileFormatProperties jsonFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ jsonFileFormatProperties = new JsonFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesEmpty() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+ Assert.assertEquals("", jsonFileFormatProperties.getJsonRoot());
+ Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths());
+ Assert.assertEquals(false,
jsonFileFormatProperties.isStripOuterArray());
+ Assert.assertEquals(false,
jsonFileFormatProperties.isReadJsonByLine());
+ Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
+ Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidJsonRoot() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, "data.items");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("data.items",
jsonFileFormatProperties.getJsonRoot());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesValidJsonPaths() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
+ "[\"$.name\", \"$.age\", \"$.city\"]");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("[\"$.name\", \"$.age\", \"$.city\"]",
jsonFileFormatProperties.getJsonPaths());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesStripOuterArrayTrue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
"true");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true,
jsonFileFormatProperties.isStripOuterArray());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesStripOuterArrayFalse() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
"false");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false,
jsonFileFormatProperties.isStripOuterArray());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesReadJsonByLineTrue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE,
"true");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesReadJsonByLineFalse() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE,
"false");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false,
jsonFileFormatProperties.isReadJsonByLine());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesNumAsStringTrue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesNumAsStringFalse() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "false");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesFuzzyParseTrue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesFuzzyParseFalse() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "false");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesInvalidBooleanValue() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "invalid");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesAllProperties() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_ROOT,
"data.records");
+ properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, "[\"$.id\",
\"$.name\"]");
+ properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
"true");
+ properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE,
"true");
+ properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
+ properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+ Assert.assertEquals("data.records",
jsonFileFormatProperties.getJsonRoot());
+ Assert.assertEquals("[\"$.id\", \"$.name\"]",
jsonFileFormatProperties.getJsonPaths());
+ Assert.assertEquals(true,
jsonFileFormatProperties.isStripOuterArray());
+ Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
+ Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
+ Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesSpecialCharactersInJsonRoot()
throws AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_ROOT,
"data.special@#$%^&*()");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("data.special@#$%^&*()",
jsonFileFormatProperties.getJsonRoot());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesComplexJsonPaths() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
+ "[\"$.deeply.nested[0].array[*].field\",
\"$.complex.path[?(@.type=='value')]\"]");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("[\"$.deeply.nested[0].array[*].field\",
\"$.complex.path[?(@.type=='value')]\"]",
+ jsonFileFormatProperties.getJsonPaths());
+ }
+
+ @Test
+ public void testAnalyzeFileFormatPropertiesEmptyJsonPaths() throws
AnalysisException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, "");
+
+ jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..0a63d0cec69
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OrcFileFormatPropertiesTest {
+
+ private OrcFileFormatProperties orcFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ orcFileFormatProperties = new OrcFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatProperties() {
+ Map<String, String> properties = new HashMap<>();
+ // Add properties if needed
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.ZLIB,
orcFileFormatProperties.getOrcCompressionType());
+ }
+
+ @Test
+ public void testSupportedCompressionTypes() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("compress_type", "plain");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.PLAIN,
orcFileFormatProperties.getOrcCompressionType());
+
+ properties.put("compress_type", "snappy");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.SNAPPYBLOCK,
orcFileFormatProperties.getOrcCompressionType());
+
+ properties.put("compress_type", "zlib");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.ZLIB,
orcFileFormatProperties.getOrcCompressionType());
+
+ properties.put("compress_type", "zstd");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.ZSTD,
orcFileFormatProperties.getOrcCompressionType());
+ }
+
+ @Test
+ public void testCompressionTypeCaseInsensitive() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("compress_type", "SNAPPY");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(TFileCompressType.SNAPPYBLOCK,
orcFileFormatProperties.getOrcCompressionType());
+ }
+
+ @Test(expected =
org.apache.doris.nereids.exceptions.AnalysisException.class)
+ public void testInvalidCompressionType() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("compress_type", "invalid_type");
+ orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ }
+
+ @Test
+ public void testFullTResultFileSinkOptions() {
+ TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+ orcFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+ Assert.assertEquals(orcFileFormatProperties.getOrcCompressionType(),
sinkOptions.getOrcCompressionType());
+ Assert.assertEquals(1, sinkOptions.getOrcWriterVersion());
+ }
+
+ @Test
+ public void testToTFileAttributes() {
+ TFileAttributes attrs = orcFileFormatProperties.toTFileAttributes();
+ Assert.assertNotNull(attrs);
+ Assert.assertNotNull(attrs.getTextParams());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..754d857613f
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TParquetCompressionType;
+import org.apache.doris.thrift.TParquetVersion;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParquetFileFormatPropertiesTest {
+
+ private ParquetFileFormatProperties parquetFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ parquetFileFormatProperties = new ParquetFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatProperties() {
+ Map<String, String> properties = new HashMap<>();
+ // Add properties if needed
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
+ Assert.assertEquals(TParquetCompressionType.SNAPPY,
parquetFileFormatProperties.getParquetCompressionType());
+ Assert.assertEquals(false,
parquetFileFormatProperties.isParquetDisableDictionary());
+ }
+
+ @Test
+ public void testSupportedCompressionTypes() {
+ Map<String, String> properties = new HashMap<>();
+ String[] types = {"snappy", "gzip", "brotli", "zstd", "lz4", "plain"};
+ TParquetCompressionType[] expected = {
+ TParquetCompressionType.SNAPPY,
+ TParquetCompressionType.GZIP,
+ TParquetCompressionType.BROTLI,
+ TParquetCompressionType.ZSTD,
+ TParquetCompressionType.LZ4,
+ TParquetCompressionType.UNCOMPRESSED
+ };
+ for (int i = 0; i < types.length; i++) {
+ properties.put("compress_type", types[i]);
+
parquetFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ Assert.assertEquals(expected[i],
parquetFileFormatProperties.getParquetCompressionType());
+ }
+ }
+
+ @Test
+ public void testCompressionTypeCaseInsensitive() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("compress_type", "SNAPPY");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ Assert.assertEquals(TParquetCompressionType.SNAPPY,
parquetFileFormatProperties.getParquetCompressionType());
+ }
+
+ @Test(expected = AnalysisException.class)
+ public void testInvalidCompressionType() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("compress_type", "invalid_type");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ }
+
+ @Test
+ public void testParquetDisableDictionary() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("parquet.disable_dictionary", "true");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertTrue(parquetFileFormatProperties.isParquetDisableDictionary());
+ properties.put("parquet.disable_dictionary", "false");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertFalse(parquetFileFormatProperties.isParquetDisableDictionary());
+ }
+
+ @Test
+ public void testParquetVersion() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("parquet.version", "v1");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
+ TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+ parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+ Assert.assertEquals(TParquetVersion.PARQUET_1_0,
sinkOptions.getParquetVersion());
+
+ properties.put("parquet.version", "latest");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+ sinkOptions = new TResultFileSinkOptions();
+ parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+ Assert.assertEquals(TParquetVersion.PARQUET_2_LATEST,
sinkOptions.getParquetVersion());
+ }
+
+ @Test
+ public void testParquetVersionInvalid() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("parquet.version", "invalid");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
+ TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+ parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+ Assert.assertEquals(TParquetVersion.PARQUET_1_0,
sinkOptions.getParquetVersion());
+ }
+
+ @Test
+ public void testFullTResultFileSinkOptions() {
+ TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+ parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+
Assert.assertEquals(parquetFileFormatProperties.getParquetCompressionType(),
sinkOptions.getParquetCompressionType());
+
Assert.assertEquals(parquetFileFormatProperties.isParquetDisableDictionary(),
sinkOptions.isParquetDisableDictionary());
+ }
+
+ @Test
+ public void testToTFileAttributes() {
+ TFileAttributes attrs =
parquetFileFormatProperties.toTFileAttributes();
+ Assert.assertNotNull(attrs);
+ Assert.assertNotNull(attrs.getTextParams());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..d94b49aca97
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WalFileFormatPropertiesTest {
+
+ private WalFileFormatProperties walFileFormatProperties;
+
+ @Before
+ public void setUp() {
+ walFileFormatProperties = new WalFileFormatProperties();
+ }
+
+ @Test
+ public void testAnalyzeFileFormatProperties() {
+ Map<String, String> properties = new HashMap<>();
+ // Add properties if needed
+ walFileFormatProperties.analyzeFileFormatProperties(properties, true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]