This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new df9fe45e4d5 [feature](hive)support create hive table for text format
#41860 (#42175)
df9fe45e4d5 is described below
commit df9fe45e4d5cbc6607fec7e396e0cb24ddb4bed7
Author: Rayner Chen <[email protected]>
AuthorDate: Mon Oct 21 16:48:23 2024 +0800
[feature](hive)support create hive table for text format #41860 (#42175)
cherry pick from #41860
Co-authored-by: wuwenchi <[email protected]>
---
.../doris/datasource/hive/HiveProperties.java | 155 +++++++++++++++++++++
.../org/apache/doris/datasource/hive/HiveUtil.java | 18 ++-
.../doris/datasource/hive/source/HiveScanNode.java | 75 ++--------
.../iceberg/source/IcebergApiSource.java | 7 -
.../iceberg/source/IcebergHMSSource.java | 16 ---
.../datasource/iceberg/source/IcebergSource.java | 4 -
.../org/apache/doris/planner/HiveTableSink.java | 67 ++-------
.../java/org/apache/doris/qe/SessionVariable.java | 6 +-
.../hive/ddl/test_hive_ddl_text_format.groovy | 78 +++++++++++
9 files changed, 278 insertions(+), 148 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java
new file mode 100644
index 00000000000..5ded87e0d23
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class HiveProperties {
+ public static final String PROP_FIELD_DELIMITER = "field.delim";
+ public static final String PROP_SEPARATOR_CHAR = "separatorChar";
+ public static final String PROP_SERIALIZATION_FORMAT =
"serialization.format";
+ public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
+
+ public static final String PROP_LINE_DELIMITER = "line.delim";
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String PROP_QUOTE_CHAR = "quoteChar";
+
+ public static final String PROP_COLLECTION_DELIMITER_HIVE2 =
"colelction.delim";
+ public static final String PROP_COLLECTION_DELIMITER_HIVE3 =
"collection.delim";
+ public static final String DEFAULT_COLLECTION_DELIMITER = "\2";
+
+ public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim";
+ public static final String DEFAULT_MAP_KV_DELIMITER = "\003";
+
+ public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
+ public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
+
+ public static final String PROP_NULL_FORMAT = "serialization.null.format";
+ public static final String DEFAULT_NULL_FORMAT = "\\N";
+
+ public static final Set<String> HIVE_SERDE_PROPERTIES = ImmutableSet.of(
+ PROP_FIELD_DELIMITER,
+ PROP_COLLECTION_DELIMITER_HIVE2,
+ PROP_COLLECTION_DELIMITER_HIVE3,
+ PROP_SEPARATOR_CHAR,
+ PROP_SERIALIZATION_FORMAT,
+ PROP_LINE_DELIMITER,
+ PROP_QUOTE_CHAR,
+ PROP_MAP_KV_DELIMITER,
+ PROP_ESCAPE_DELIMITER,
+ PROP_NULL_FORMAT
+ );
+
+ public static String getFieldDelimiter(Table table) {
+ // This method is used for text format.
+ // If you need compatibility with csv format, please use
`getColumnSeparator`.
+ Optional<String> fieldDelim =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER);
+ Optional<String> serFormat =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT);
+ return
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
+ DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat));
+ }
+
+ public static String getColumnSeparator(Table table) {
+ Optional<String> fieldDelim =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER);
+ Optional<String> columnSeparator =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SEPARATOR_CHAR);
+ Optional<String> serFormat =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT);
+ return
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
+ DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat));
+ }
+
+
+ public static String getLineDelimiter(Table table) {
+ Optional<String> lineDelim =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_LINE_DELIMITER);
+ return
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
+ DEFAULT_LINE_DELIMITER, lineDelim));
+ }
+
+ public static String getMapKvDelimiter(Table table) {
+ Optional<String> mapkvDelim =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_MAP_KV_DELIMITER);
+ return
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
+ DEFAULT_MAP_KV_DELIMITER, mapkvDelim));
+ }
+
+ public static String getCollectionDelimiter(Table table) {
+ Optional<String> collectionDelimHive2 =
HiveMetaStoreClientHelper.getSerdeProperty(table,
+ PROP_COLLECTION_DELIMITER_HIVE2);
+ Optional<String> collectionDelimHive3 =
HiveMetaStoreClientHelper.getSerdeProperty(table,
+ PROP_COLLECTION_DELIMITER_HIVE3);
+ return
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
+ DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2,
collectionDelimHive3));
+ }
+
+ public static Optional<String> getQuoteChar(Table table) {
+ Map<String, String> serdeParams =
table.getSd().getSerdeInfo().getParameters();
+ if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
+ return Optional.of(serdeParams.get(PROP_QUOTE_CHAR));
+ }
+ return Optional.empty();
+ }
+
+ public static Optional<String> getEscapeDelimiter(Table table) {
+ Optional<String> escapeDelim =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_ESCAPE_DELIMITER);
+ if (escapeDelim.isPresent()) {
+ String escape =
HiveMetaStoreClientHelper.getByte(escapeDelim.get());
+ if (escape != null) {
+ return Optional.of(escape);
+ } else {
+ return Optional.of(DEFAULT_ESCAPE_DELIMIER);
+ }
+ }
+ return Optional.empty();
+ }
+
+ public static String getNullFormat(Table table) {
+ Optional<String> nullFormat =
HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_NULL_FORMAT);
+ return
HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_NULL_FORMAT,
nullFormat);
+ }
+
+ // Set properties to table
+ public static void setTableProperties(Table table, Map<String, String>
properties) {
+ HashMap<String, String> serdeProps = new HashMap<>();
+ HashMap<String, String> tblProps = new HashMap<>();
+
+ for (String k : properties.keySet()) {
+ if (HIVE_SERDE_PROPERTIES.contains(k)) {
+ serdeProps.put(k, properties.get(k));
+ } else {
+ tblProps.put(k, properties.get(k));
+ }
+ }
+
+ if (table.getParameters() == null) {
+ table.setParameters(tblProps);
+ } else {
+ table.getParameters().putAll(tblProps);
+ }
+
+ if (table.getSd().getSerdeInfo().getParameters() == null) {
+ table.getSd().getSerdeInfo().setParameters(serdeProps);
+ } else {
+ table.getSd().getSerdeInfo().getParameters().putAll(serdeProps);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index dac5d55e5ee..56acec782c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -25,6 +25,7 @@ import
org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -69,6 +70,8 @@ public final class HiveUtil {
public static final String COMPRESSION_KEY = "compression";
public static final Set<String> SUPPORTED_ORC_COMPRESSIONS =
ImmutableSet.of("plain", "zlib", "snappy", "zstd");
public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS =
ImmutableSet.of("plain", "snappy", "zstd");
+ public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
+ ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
private HiveUtil() {
}
@@ -191,7 +194,6 @@ public final class HiveUtil {
Table table = new Table();
table.setDbName(hiveTable.getDbName());
table.setTableName(hiveTable.getTableName());
- // table.setOwner("");
int createTime = (int) System.currentTimeMillis() * 1000;
table.setCreateTime(createTime);
table.setLastAccessTime(createTime);
@@ -211,10 +213,10 @@ public final class HiveUtil {
setCompressType(hiveTable, props);
// set hive table comment by table properties
props.put("comment", hiveTable.getComment());
- table.setParameters(props);
if (props.containsKey("owner")) {
table.setOwner(props.get("owner"));
}
+ HiveProperties.setTableProperties(table, props);
return table;
}
@@ -232,6 +234,12 @@ public final class HiveUtil {
throw new AnalysisException("Unsupported orc compression type
" + compression);
}
props.putIfAbsent("orc.compress", StringUtils.isEmpty(compression)
? "zlib" : compression);
+ } else if (fileFormat.equalsIgnoreCase("text")) {
+ if (StringUtils.isNotEmpty(compression) &&
!SUPPORTED_TEXT_COMPRESSIONS.contains(compression)) {
+ throw new AnalysisException("Unsupported text compression type
" + compression);
+ }
+ props.putIfAbsent("text.compression",
StringUtils.isEmpty(compression)
+ ?
ConnectContext.get().getSessionVariable().hiveTextCompression() : compression);
} else {
throw new IllegalArgumentException("Compression is not supported
on " + fileFormat);
}
@@ -249,7 +257,7 @@ public final class HiveUtil {
sd.setBucketCols(bucketCols);
sd.setNumBuckets(numBuckets);
Map<String, String> parameters = new HashMap<>();
- parameters.put("tag", "doris external hive talbe");
+ parameters.put("tag", "doris external hive table");
sd.setParameters(parameters);
return sd;
}
@@ -266,6 +274,10 @@ public final class HiveUtil {
inputFormat =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
outputFormat =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
serDe =
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
+ } else if (fileFormat.equalsIgnoreCase("text")) {
+ inputFormat = "org.apache.hadoop.mapred.TextInputFormat";
+ outputFormat =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+ serDe = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
} else {
throw new IllegalArgumentException("Creating table with an
unsupported file format: " + fileFormat);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 0dcf4724a7b..435967cef0e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -38,6 +38,7 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.datasource.hive.HiveProperties;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
@@ -57,6 +58,7 @@ import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -65,7 +67,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -77,26 +78,6 @@ import java.util.stream.Collectors;
public class HiveScanNode extends FileQueryScanNode {
private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);
- public static final String PROP_FIELD_DELIMITER = "field.delim";
- public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
- public static final String PROP_LINE_DELIMITER = "line.delim";
- public static final String DEFAULT_LINE_DELIMITER = "\n";
- public static final String PROP_SEPARATOR_CHAR = "separatorChar";
- public static final String PROP_QUOTE_CHAR = "quoteChar";
- public static final String PROP_SERIALIZATION_FORMAT =
"serialization.format";
-
- public static final String PROP_COLLECTION_DELIMITER_HIVE2 =
"colelction.delim";
- public static final String PROP_COLLECTION_DELIMITER_HIVE3 =
"collection.delim";
- public static final String DEFAULT_COLLECTION_DELIMITER = "\2";
-
- public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim";
- public static final String DEFAULT_MAP_KV_DELIMITER = "\003";
-
- public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
- public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
- public static final String PROP_NULL_FORMAT = "serialization.null.format";
- public static final String DEFAULT_NULL_FORMAT = "\\N";
-
protected final HMSExternalTable hmsTable;
private HiveTransaction hiveTransaction = null;
@@ -431,57 +412,21 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
protected TFileAttributes getFileAttributes() throws UserException {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
-
+ Table table = hmsTable.getRemoteTable();
// 1. set column separator
- Optional<String> fieldDelim =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_FIELD_DELIMITER);
- Optional<String> serFormat =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_SERIALIZATION_FORMAT);
- Optional<String> columnSeparator =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_SEPARATOR_CHAR);
-
textParams.setColumnSeparator(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator,
serFormat)));
+
textParams.setColumnSeparator(HiveProperties.getColumnSeparator(table));
// 2. set line delimiter
- Optional<String> lineDelim =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_LINE_DELIMITER);
-
textParams.setLineDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_LINE_DELIMITER, lineDelim)));
+ textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table));
// 3. set mapkv delimiter
- Optional<String> mapkvDelim =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_MAP_KV_DELIMITER);
-
textParams.setMapkvDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_MAP_KV_DELIMITER, mapkvDelim)));
+ textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table));
// 4. set collection delimiter
- Optional<String> collectionDelimHive2 =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_COLLECTION_DELIMITER_HIVE2);
- Optional<String> collectionDelimHive3 =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_COLLECTION_DELIMITER_HIVE3);
- textParams.setCollectionDelimiter(
-
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2,
collectionDelimHive3)));
+
textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table));
// 5. set quote char
- Map<String, String> serdeParams =
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
- if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
-
textParams.setEnclose(serdeParams.get(PROP_QUOTE_CHAR).getBytes()[0]);
- }
+ HiveProperties.getQuoteChar(table).ifPresent(d ->
textParams.setEnclose(d.getBytes()[0]));
// 6. set escape delimiter
- Optional<String> escapeDelim =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_ESCAPE_DELIMITER);
- if (escapeDelim.isPresent()) {
- String escape = HiveMetaStoreClientHelper.getByte(
- escapeDelim.get());
- if (escape != null) {
- textParams
- .setEscape(escape.getBytes()[0]);
- } else {
- textParams.setEscape(DEFAULT_ESCAPE_DELIMIER.getBytes()[0]);
- }
- }
+ HiveProperties.getEscapeDelimiter(table).ifPresent(d ->
textParams.setEscape(d.getBytes()[0]));
// 7. set null format
- Optional<String> nullFormat =
HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
- PROP_NULL_FORMAT);
-
textParams.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_NULL_FORMAT, nullFormat));
+ textParams.setNullFormat(HiveProperties.getNullFormat(table));
TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index 56ff188f964..4b4e76bea47 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -21,12 +21,10 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.thrift.TFileAttributes;
import org.apache.iceberg.Table;
@@ -74,11 +72,6 @@ public class IcebergApiSource implements IcebergSource {
return icebergExtTable;
}
- @Override
- public TFileAttributes getFileAttributes() throws UserException {
- return new TFileAttributes();
- }
-
@Override
public ExternalCatalog getCatalog() {
return icebergExtTable.getCatalog();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 5e9860171d0..531f4e4ae3c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -22,14 +22,10 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
import java.util.Map;
@@ -70,18 +66,6 @@ public class IcebergHMSSource implements IcebergSource {
return hmsTable;
}
- @Override
- public TFileAttributes getFileAttributes() throws UserException {
- TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
-
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
- .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER,
HiveScanNode.DEFAULT_FIELD_DELIMITER));
- textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER);
- TFileAttributes fileAttributes = new TFileAttributes();
- fileAttributes.setTextParams(textParams);
- fileAttributes.setHeaderType("");
- return fileAttributes;
- }
-
@Override
public ExternalCatalog getCatalog() {
return hmsTable.getCatalog();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
index b4b1bf2a805..be1ce752106 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
@@ -21,9 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.thrift.TFileAttributes;
public interface IcebergSource {
@@ -33,8 +31,6 @@ public interface IcebergSource {
TableIf getTargetTable();
- TFileAttributes getFileAttributes() throws UserException;
-
ExternalCatalog getCatalog();
String getFileFormat() throws DdlException, MetaNotFoundException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 676241d06d8..d1f8ab411ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -25,7 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hive.HiveProperties;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
@@ -42,7 +42,9 @@ import org.apache.doris.thrift.THivePartition;
import org.apache.doris.thrift.THiveSerDeProperties;
import org.apache.doris.thrift.THiveTableSink;
+import com.google.common.base.Strings;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import java.util.ArrayList;
import java.util.HashSet;
@@ -52,20 +54,6 @@ import java.util.Set;
import java.util.stream.Collectors;
public class HiveTableSink extends BaseExternalTableDataSink {
- public static final String PROP_FIELD_DELIMITER = "field.delim";
- public static final String DEFAULT_FIELD_DELIMITER = "\1";
- public static final String PROP_SERIALIZATION_FORMAT =
"serialization.format";
- public static final String PROP_LINE_DELIMITER = "line.delim";
- public static final String DEFAULT_LINE_DELIMITER = "\n";
- public static final String PROP_COLLECT_DELIMITER = "collection.delim";
- public static final String DEFAULT_COLLECT_DELIMITER = "\2";
- public static final String PROP_MAPKV_DELIMITER = "mapkv.delim";
- public static final String DEFAULT_MAPKV_DELIMITER = "\3";
- public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
- public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
- public static final String PROP_NULL_FORMAT = "serialization.null.format";
- public static final String DEFAULT_NULL_FORMAT = "\\N";
-
private final HMSExternalTable targetTable;
private static final HashSet<TFileFormatType> supportedTypes = new
HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_CSV_PLAIN);
@@ -184,10 +172,13 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
compressType =
targetTable.getRemoteTable().getParameters().get("parquet.compression");
break;
case FORMAT_CSV_PLAIN:
- compressType =
ConnectContext.get().getSessionVariable().hiveTextCompression();
+ compressType =
targetTable.getRemoteTable().getParameters().get("text.compression");
+ if (Strings.isNullOrEmpty(compressType)) {
+ compressType =
ConnectContext.get().getSessionVariable().hiveTextCompression();
+ }
break;
default:
- compressType = "uncompressed";
+ compressType = "plain";
break;
}
tSink.setCompressionType(getTFileCompressType(compressType));
@@ -218,47 +209,19 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
private void setSerDeProperties(THiveTableSink tSink) {
THiveSerDeProperties serDeProperties = new THiveSerDeProperties();
+ Table table = targetTable.getRemoteTable();
// 1. set field delimiter
- Optional<String> fieldDelim =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_FIELD_DELIMITER);
- Optional<String> serFormat =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_SERIALIZATION_FORMAT);
-
serDeProperties.setFieldDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)));
+ serDeProperties.setFieldDelim(HiveProperties.getFieldDelimiter(table));
// 2. set line delimiter
- Optional<String> lineDelim =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_LINE_DELIMITER);
-
serDeProperties.setLineDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_LINE_DELIMITER, lineDelim)));
+ serDeProperties.setLineDelim(HiveProperties.getLineDelimiter(table));
// 3. set collection delimiter
- Optional<String> collectDelim =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_COLLECT_DELIMITER);
- serDeProperties
-
.setCollectionDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_COLLECT_DELIMITER, collectDelim)));
+
serDeProperties.setCollectionDelim(HiveProperties.getCollectionDelimiter(table));
// 4. set mapkv delimiter
- Optional<String> mapkvDelim =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_MAPKV_DELIMITER);
-
serDeProperties.setMapkvDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_MAPKV_DELIMITER, mapkvDelim)));
+ serDeProperties.setMapkvDelim(HiveProperties.getMapKvDelimiter(table));
// 5. set escape delimiter
- Optional<String> escapeDelim =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_ESCAPE_DELIMITER);
- if (escapeDelim.isPresent()) {
- String escape = HiveMetaStoreClientHelper.getByte(
- escapeDelim.get());
- if (escape != null) {
- serDeProperties
- .setEscapeChar(escape);
- } else {
- serDeProperties.setEscapeChar(DEFAULT_ESCAPE_DELIMIER);
- }
- }
+
HiveProperties.getEscapeDelimiter(table).ifPresent(serDeProperties::setEscapeChar);
// 6. set null format
- Optional<String> nullFormat =
HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
- PROP_NULL_FORMAT);
-
serDeProperties.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
- DEFAULT_NULL_FORMAT, nullFormat));
+ serDeProperties.setNullFormat(HiveProperties.getNullFormat(table));
tSink.setSerdeProperties(serDeProperties);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4ff283fd577..4964be5e58e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1153,7 +1153,7 @@ public class SessionVariable implements Serializable,
Writable {
public int sortPhaseNum = 0;
@VariableMgr.VarAttr(name = HIVE_TEXT_COMPRESSION, needForward = true)
- private String hiveTextCompression = "uncompressed";
+ private String hiveTextCompression = "plain";
@VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward =
true,
description = {"在读取csv文件时是否读取csv的空行为null",
@@ -4112,6 +4112,10 @@ public class SessionVariable implements Serializable,
Writable {
}
public String hiveTextCompression() {
+ if (hiveTextCompression.equals("uncompressed")) {
+ // This is for compatibility.
+ return "plain";
+ }
return hiveTextCompression;
}
diff --git
a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy
b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy
new file mode 100644
index 00000000000..aaa5b198e69
--- /dev/null
+++
b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_ddl_text_format",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String hms_port = context.config.otherConfigs.get("hive3HmsPort")
+ String hdfs_port = context.config.otherConfigs.get("hive3HdfsPort")
+ String catalog_name = "test_hive_ddl_text_format"
+ String table_name = "table_with_pars";
+
+ sql """drop catalog if exists ${catalog_name};"""
+
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}',
+ 'use_meta_cache' = 'true'
+ );
+ """
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """use `default`;"""
+
+ sql """ drop table if exists tb_text """
+ sql """
+ create table tb_text (
+ id int,
+ `name` string
+ ) PROPERTIES (
+ 'compression'='gzip',
+ 'file_format'='text',
+ 'field.delim'='\t',
+ 'line.delim'='\n',
+ 'collection.delim'=';',
+ 'mapkey.delim'=':',
+ 'serialization.null.format'='\\N'
+ );
+ """
+
+ String serde = "'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'"
+ String input_format = "'org.apache.hadoop.mapred.TextInputFormat'"
+ String output_format =
"'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"
+ String doris_fileformat = "'doris.file_format'='text'"
+ String filed_delim = "'field.delim'"
+ String line_delim = "'line.delim'"
+ String mapkey_delim = "'mapkey.delim'"
+
+ def create_tbl_res = sql """ show create table tb_text """
+ String res = create_tbl_res.toString()
+ logger.info("${res}")
+ assertTrue(res.containsIgnoreCase("${serde}"))
+ assertTrue(res.containsIgnoreCase("${input_format}"))
+ assertTrue(res.containsIgnoreCase("${output_format}"))
+ assertTrue(res.containsIgnoreCase("${doris_fileformat}"))
+ assertTrue(res.containsIgnoreCase("${filed_delim}"))
+ assertTrue(res.containsIgnoreCase("${filed_delim}"))
+ assertTrue(res.containsIgnoreCase("${line_delim}"))
+ assertTrue(res.containsIgnoreCase("${mapkey_delim}"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]