This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fcd2ee74757 [fix](branch-3.0) Fix the data type mapping for complex
types in Doris to the ORC and Parquet file formats (#45122)
fcd2ee74757 is described below
commit fcd2ee74757c1ae3520e6e70c870c05562fa5e33
Author: Tiewei Fang <[email protected]>
AuthorDate: Sun Dec 8 16:28:03 2024 +0800
[fix](branch-3.0) Fix the data type mapping for complex types in Doris to
the ORC and Parquet file formats (#45122)
picked from #44041
---
be/src/util/arrow/row_batch.cpp | 9 +-
.../data_types/serde/data_type_bitmap_serde.cpp | 59 +++++++++++-
.../vec/data_types/serde/data_type_bitmap_serde.h | 13 +--
.../data_types/serde/data_type_date64_serde.cpp | 20 +---
.../vec/data_types/serde/data_type_hll_serde.cpp | 48 +++++-----
.../vec/data_types/serde/data_type_ipv6_serde.cpp | 37 +++----
.../vec/data_types/serde/data_type_jsonb_serde.cpp | 28 +++++-
.../data_types/serde/data_type_number_serde.cpp | 36 ++-----
.../data_types/serde/data_type_object_serde.cpp | 35 +++++++
.../vec/data_types/serde/data_type_object_serde.h | 4 +-
.../serde/data_type_quantilestate_serde.h | 56 ++++++++++-
be/src/vec/data_types/serde/data_type_serde.h | 12 +++
.../org/apache/doris/analysis/OutFileClause.java | 21 ++--
.../outfile/test_outfile_complex_type.out | 25 +++++
.../outfile/test_outfile_jsonb_and_variant.out | 25 +++++
.../outfile/test_outfile_complex_type.groovy | 106 +++++++++++++++++++++
.../outfile/test_outfile_jsonb_and_variant.groovy | 104 ++++++++++++++++++++
17 files changed, 506 insertions(+), 132 deletions(-)
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index dd11d5ae46f..a0cd77aee41 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -24,6 +24,7 @@
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/type.h>
+#include <arrow/type_fwd.h>
#include <glog/logging.h>
#include <stdint.h>
@@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
case TYPE_LARGEINT:
case TYPE_VARCHAR:
case TYPE_CHAR:
- case TYPE_HLL:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_STRING:
case TYPE_JSONB:
- case TYPE_OBJECT:
*result = arrow::utf8();
break;
case TYPE_DATEV2:
@@ -150,6 +149,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
*result = arrow::utf8();
break;
}
+ case TYPE_QUANTILE_STATE:
+ case TYPE_OBJECT:
+ case TYPE_HLL: {
+ *result = arrow::binary();
+ break;
+ }
default:
return Status::InvalidArgument("Unknown primitive type({}) convert to
Arrow type",
type.type);
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index f52a3d1e9b4..f60d054df31 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -17,6 +17,7 @@
#include "data_type_bitmap_serde.h"
+#include <arrow/array/builder_binary.h>
#include <gen_cpp/types.pb.h>
#include <string>
@@ -27,12 +28,36 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
+#include "vec/data_types/serde/data_type_nullable_serde.h"
namespace doris {
namespace vectorized {
class IColumn;
+Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column,
int start_idx,
+ int end_idx,
BufferWritable& bw,
+ FormatOptions& options)
const {
+ SERIALIZE_COLUMN_TO_JSON();
+}
+
+Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column,
int row_num,
+ BufferWritable& bw,
+ FormatOptions& options)
const {
+ /**
+ * For null values in ordinary types, we use \N to represent them;
+ * for null values in nested types, we use null to represent them, just
like the json format.
+ */
+ if (_nesting_level >= 2) {
+ bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
+ strlen(NULL_IN_COMPLEX_TYPE.c_str()));
+ } else {
+ bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
+ strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
+ }
+ return Status::OK();
+}
+
Status DataTypeBitMapSerDe::deserialize_column_from_json_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
@@ -95,6 +120,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const
IColumn& column, JsonbWr
result.writeEndBinary();
}
+void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end, const
cctz::time_zone& ctz) const {
+ const auto& col = assert_cast<const ColumnBitmap&>(column);
+ auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
+ for (size_t string_i = start; string_i < end; ++string_i) {
+ if (null_map && (*null_map)[string_i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ auto& bitmap_value =
const_cast<BitmapValue&>(col.get_element(string_i));
+ std::string memory_buffer(bitmap_value.getSizeInBytes(), '0');
+ bitmap_value.write_to(memory_buffer.data());
+ checkArrowStatus(
+ builder.Append(memory_buffer.data(),
static_cast<int>(memory_buffer.size())),
+ column.get_name(), array_builder->type()->name());
+ }
+ }
+}
+
void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const
JsonbValue* arg) const {
auto& col = reinterpret_cast<ColumnBitmap&>(column);
auto blob = static_cast<const JsonbBlobVal*>(arg);
@@ -147,11 +192,19 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const
std::string& timezone, con
auto& col_data = assert_cast<const ColumnBitmap&>(column);
orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
+ INIT_MEMORY_FOR_ORC_WRITER()
+
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
- const auto& ele = col_data.get_data_at(row_id);
- cur_batch->data[row_id] = const_cast<char*>(ele.data);
- cur_batch->length[row_id] = ele.size;
+ auto bitmap_value =
const_cast<BitmapValue&>(col_data.get_element(row_id));
+ size_t len = bitmap_value.getSizeInBytes();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ bitmap_value.write_to(const_cast<char*>(bufferRef.data) + offset);
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
}
}
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index d4a643b3b16..ba7842e354c 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -36,14 +36,10 @@ public:
DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level)
{};
Status serialize_one_cell_to_json(const IColumn& column, int row_num,
BufferWritable& bw,
- FormatOptions& options) const override {
- return Status::NotSupported("serialize_one_cell_to_json with type
[{}]", column.get_name());
- }
+ FormatOptions& options) const override;
Status serialize_column_to_json(const IColumn& column, int start_idx, int
end_idx,
- BufferWritable& bw, FormatOptions&
options) const override {
- return Status::NotSupported("serialize_column_to_json with type [{}]",
column.get_name());
- }
+ BufferWritable& bw, FormatOptions&
options) const override;
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const
override;
@@ -63,10 +59,7 @@ public:
void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start,
int end,
- const cctz::time_zone& ctz) const override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "write_column_to_arrow with type " +
column.get_name());
- }
+ const cctz::time_zone& ctz) const override;
void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
int end, const cctz::time_zone& ctz) const
override {
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index e749b2fa2e7..4cdd6b90326 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -288,16 +288,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const
std::string& timezone, con
auto& col_data = static_cast<const
ColumnVector<Int64>&>(column).get_data();
orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
- char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
- if (!ptr) {
- return Status::InternalError(
- "malloc memory error when write largeint column data to orc
file.");
- }
- StringRef bufferRef;
- bufferRef.data = ptr;
- bufferRef.size = BUFFER_UNIT_SIZE;
- size_t offset = 0;
- const size_t begin_off = offset;
+ INIT_MEMORY_FOR_ORC_WRITER()
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
@@ -309,18 +300,11 @@ Status DataTypeDate64SerDe::write_column_to_orc(const
std::string& timezone, con
REALLOC_MEMORY_FOR_ORC_WRITER()
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
- size_t data_off = 0;
- for (size_t row_id = start; row_id < end; row_id++) {
- if (cur_batch->notNull[row_id] == 1) {
- cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
begin_off + data_off;
- data_off += cur_batch->length[row_id];
- }
- }
- buffer_list.emplace_back(bufferRef);
cur_batch->numElements = end - start;
return Status::OK();
}
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index 799e1c15d63..aba3a9d0619 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -21,6 +21,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <memory>
#include <string>
#include "arrow/array/builder_binary.h"
@@ -47,28 +48,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const
IColumn& column, int sta
Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int
row_num,
BufferWritable& bw,
FormatOptions& options)
const {
- if (!options._output_object_data) {
- /**
- * For null values in ordinary types, we use \N to represent them;
- * for null values in nested types, we use null to represent them,
just like the json format.
- */
- if (_nesting_level >= 2) {
- bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
- strlen(NULL_IN_COMPLEX_TYPE.c_str()));
- } else {
-
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
- strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
- }
- return Status::OK();
+ /**
+ * For null values in ordinary types, we use \N to represent them;
+ * for null values in nested types, we use null to represent them, just
like the json format.
+ */
+ if (_nesting_level >= 2) {
+ bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
+ strlen(NULL_IN_COMPLEX_TYPE.c_str()));
+ } else {
+ bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
+ strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
- auto col_row = check_column_const_set_readability(column, row_num);
- ColumnPtr ptr = col_row.first;
- row_num = col_row.second;
- auto& data = const_cast<HyperLogLog&>(assert_cast<const
ColumnHLL&>(*ptr).get_element(row_num));
- std::unique_ptr<char[]> buf =
- std::make_unique_for_overwrite<char[]>(data.max_serialized_size());
- size_t size = data.serialize((uint8*)buf.get());
- bw.write(buf.get(), size);
return Status::OK();
}
@@ -137,7 +127,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn&
column, const NullMa
arrow::ArrayBuilder*
array_builder, int start, int end,
const cctz::time_zone& ctz) const
{
const auto& col = assert_cast<const ColumnHLL&>(column);
- auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
checkArrowStatus(builder.AppendNull(), column.get_name(),
@@ -195,11 +185,19 @@ Status DataTypeHLLSerDe::write_column_to_orc(const
std::string& timezone, const
auto& col_data = assert_cast<const ColumnHLL&>(column);
orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
+ INIT_MEMORY_FOR_ORC_WRITER()
+
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
- const auto& ele = col_data.get_data_at(row_id);
- cur_batch->data[row_id] = const_cast<char*>(ele.data);
- cur_batch->length[row_id] = ele.size;
+ auto hll_value =
const_cast<HyperLogLog&>(col_data.get_element(row_id));
+ size_t len = hll_value.max_serialized_size();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ hll_value.serialize((uint8_t*)(bufferRef.data) + offset);
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
}
}
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 612c9ce4222..468565ad2d2 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -182,38 +182,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const
std::string& timezone, const
int end, std::vector<StringRef>&
buffer_list) const {
const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
orc::StringVectorBatch* cur_batch =
assert_cast<orc::StringVectorBatch*>(orc_col_batch);
- char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
- if (!ptr) {
- return Status::InternalError(
- "malloc memory error when write largeint column data to orc
file.");
- }
- StringRef bufferRef;
- bufferRef.data = ptr;
- bufferRef.size = BUFFER_UNIT_SIZE;
- size_t offset = 0;
- const size_t begin_off = offset;
-
- for (size_t row_id = start; row_id < end; row_id++) {
- if (cur_batch->notNull[row_id] == 0) {
- continue;
- }
- std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
- size_t len = ipv6_str.size();
- REALLOC_MEMORY_FOR_ORC_WRITER()
+ INIT_MEMORY_FOR_ORC_WRITER()
- strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
- offset += len;
- cur_batch->length[row_id] = len;
- }
- size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
- cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
begin_off + data_off;
- data_off += cur_batch->length[row_id];
+ std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
+ size_t len = ipv6_str.size();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ strcpy(const_cast<char*>(bufferRef.data) + offset,
ipv6_str.c_str());
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
}
}
- buffer_list.emplace_back(bufferRef);
+
cur_batch->numElements = end - start;
return Status::OK();
}
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index 08514a6eea7..eb6c783cf28 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -21,6 +21,10 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
#include "arrow/array/builder_binary.h"
#include "common/exception.h"
#include "common/status.h"
@@ -136,7 +140,29 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const
std::string& timezone, cons
const NullMap* null_map,
orc::ColumnVectorBatch*
orc_col_batch, int start,
int end,
std::vector<StringRef>& buffer_list) const {
- return Status::NotSupported("write_column_to_orc with type [{}]",
column.get_name());
+ auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
+ const auto& string_column = assert_cast<const ColumnString&>(column);
+
+ INIT_MEMORY_FOR_ORC_WRITER()
+
+ for (size_t row_id = start; row_id < end; row_id++) {
+ if (cur_batch->notNull[row_id] == 1) {
+ std::string_view string_ref =
string_column.get_data_at(row_id).to_string_view();
+ auto serialized_value = std::make_unique<std::string>(
+ JsonbToJson::jsonb_to_json_string(string_ref.data(),
string_ref.size()));
+ auto len = serialized_value->size();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ memcpy(const_cast<char*>(bufferRef.data) + offset,
serialized_value->data(), len);
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
+ }
+ }
+
+ cur_batch->numElements = end - start;
+ return Status::OK();
}
void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value&
target,
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index f4fb6bbbb1f..972668e65fd 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -339,38 +339,22 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const
std::string& timezone,
if constexpr (std::is_same_v<T, Int128>) { // largeint
orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
- char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
- if (!ptr) {
- return Status::InternalError(
- "malloc memory error when write largeint column data to
orc file.");
- }
- StringRef bufferRef;
- bufferRef.data = ptr;
- bufferRef.size = BUFFER_UNIT_SIZE;
- size_t offset = 0;
- const size_t begin_off = offset;
+ INIT_MEMORY_FOR_ORC_WRITER()
for (size_t row_id = start; row_id < end; row_id++) {
- if (cur_batch->notNull[row_id] == 0) {
- continue;
- }
- std::string value_str = fmt::format("{}", col_data[row_id]);
- size_t len = value_str.size();
+ if (cur_batch->notNull[row_id] == 1) {
+ std::string value_str = fmt::format("{}", col_data[row_id]);
+ size_t len = value_str.size();
- REALLOC_MEMORY_FOR_ORC_WRITER()
+ REALLOC_MEMORY_FOR_ORC_WRITER()
- strcpy(const_cast<char*>(bufferRef.data) + offset,
value_str.c_str());
- offset += len;
- cur_batch->length[row_id] = len;
- }
- size_t data_off = 0;
- for (size_t row_id = start; row_id < end; row_id++) {
- if (cur_batch->notNull[row_id] == 1) {
- cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
begin_off + data_off;
- data_off += cur_batch->length[row_id];
+ strcpy(const_cast<char*>(bufferRef.data) + offset,
value_str.c_str());
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
}
}
- buffer_list.emplace_back(bufferRef);
+
cur_batch->numElements = end - start;
} else if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>)
{ // tinyint/boolean
WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch)
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index 383add39354..1b4e1caf4ac 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -19,6 +19,9 @@
#include <rapidjson/stringbuffer.h>
+#include <cstdint>
+#include <string>
+
#include "common/exception.h"
#include "common/status.h"
#include "vec/columns/column.h"
@@ -192,6 +195,38 @@ Status DataTypeObjectSerDe::write_one_cell_to_json(const
IColumn& column, rapidj
return Status::OK();
}
+Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
+ const NullMap* null_map,
+ orc::ColumnVectorBatch*
orc_col_batch, int start,
+ int end,
+ std::vector<StringRef>&
buffer_list) const {
+ const auto* var = check_and_get_column<ColumnObject>(column);
+ orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
+
+ INIT_MEMORY_FOR_ORC_WRITER()
+
+ for (size_t row_id = start; row_id < end; row_id++) {
+ if (cur_batch->notNull[row_id] == 1) {
+ auto serialized_value = std::make_unique<std::string>();
+ if (!var->serialize_one_row_to_string(row_id,
serialized_value.get())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to
serialize variant {}",
+ var->dump_structure());
+ }
+ auto len = serialized_value->length();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ memcpy(const_cast<char*>(bufferRef.data) + offset,
serialized_value->data(), len);
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
+ }
+ }
+
+ cur_batch->numElements = end - start;
+ return Status::OK();
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 314922f8694..41608dc3f85 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -89,9 +89,7 @@ public:
Status write_column_to_orc(const std::string& timezone, const IColumn&
column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int start, int end,
- std::vector<StringRef>& buffer_list) const
override {
- return Status::NotSupported("write_column_to_orc with type " +
column.get_name());
- }
+ std::vector<StringRef>& buffer_list) const
override;
Status write_one_cell_to_json(const IColumn& column, rapidjson::Value&
result,
rapidjson::Document::AllocatorType&
allocator, Arena& mem_pool,
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index e91fe0166e1..e24a3a29543 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -17,6 +17,7 @@
#pragma once
+#include <arrow/array/builder_binary.h>
#include <gen_cpp/types.pb.h>
#include <stddef.h>
#include <stdint.h>
@@ -32,6 +33,7 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/string_ref.h"
+#include "vec/data_types/serde/data_type_nullable_serde.h"
namespace doris {
@@ -43,12 +45,23 @@ public:
Status serialize_one_cell_to_json(const IColumn& column, int row_num,
BufferWritable& bw,
FormatOptions& options) const override {
- return Status::NotSupported("serialize_one_cell_to_json with type
[{}]", column.get_name());
+ /**
+ * For null values in ordinary types, we use \N to represent them;
+ * for null values in nested types, we use null to represent them, just
like the json format.
+ */
+ if (_nesting_level >= 2) {
+ bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
+ strlen(NULL_IN_COMPLEX_TYPE.c_str()));
+ } else {
+
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
+ strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
+ }
+ return Status::OK();
}
Status serialize_column_to_json(const IColumn& column, int start_idx, int
end_idx,
BufferWritable& bw, FormatOptions&
options) const override {
- return Status::NotSupported("serialize_column_to_json with type [{}]",
column.get_name());
+ SERIALIZE_COLUMN_TO_JSON();
}
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const
override {
@@ -102,8 +115,21 @@ public:
void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start,
int end,
const cctz::time_zone& ctz) const override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "write_column_to_arrow with type " +
column.get_name());
+ const auto& col = assert_cast<const ColumnQuantileState&>(column);
+ auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
+ for (size_t string_i = start; string_i < end; ++string_i) {
+ if (null_map && (*null_map)[string_i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ auto& quantile_state_value =
const_cast<QuantileState&>(col.get_element(string_i));
+ std::string
memory_buffer(quantile_state_value.get_serialized_size(), '0');
+ quantile_state_value.serialize((uint8_t*)memory_buffer.data());
+ checkArrowStatus(builder.Append(memory_buffer.data(),
+
static_cast<int>(memory_buffer.size())),
+ column.get_name(),
array_builder->type()->name());
+ }
+ }
}
void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
int end, const cctz::time_zone& ctz) const
override {
@@ -126,7 +152,27 @@ public:
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int start, int end,
std::vector<StringRef>& buffer_list) const
override {
- return Status::NotSupported("write_column_to_orc with type [{}]",
column.get_name());
+ auto& col_data = assert_cast<const ColumnQuantileState&>(column);
+ orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
+
+ INIT_MEMORY_FOR_ORC_WRITER()
+
+ for (size_t row_id = start; row_id < end; row_id++) {
+ if (cur_batch->notNull[row_id] == 1) {
+ auto quantilestate_value =
const_cast<QuantileState&>(col_data.get_element(row_id));
+ size_t len = quantilestate_value.get_serialized_size();
+
+ REALLOC_MEMORY_FOR_ORC_WRITER()
+
+ quantilestate_value.serialize((uint8_t*)(bufferRef.data) +
offset);
+ cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) +
offset;
+ cur_batch->length[row_id] = len;
+ offset += len;
+ }
+ }
+
+ cur_batch->numElements = end - start;
+ return Status::OK();
}
private:
diff --git a/be/src/vec/data_types/serde/data_type_serde.h
b/be/src/vec/data_types/serde/data_type_serde.h
index 6caa51d2663..105b1bbaedd 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -75,6 +75,18 @@ struct ColumnVectorBatch;
++*num_deserialized;
\
}
+#define INIT_MEMORY_FOR_ORC_WRITER()
\
+ char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
\
+ if (!ptr) {
\
+ return Status::InternalError(
\
+ "malloc memory error when write largeint column data to orc
file."); \
+ }
\
+ StringRef bufferRef;
\
+ bufferRef.data = ptr;
\
+ bufferRef.size = BUFFER_UNIT_SIZE;
\
+ size_t offset = 0;
\
+ buffer_list.emplace_back(bufferRef);
+
#define REALLOC_MEMORY_FOR_ORC_WRITER()
\
while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) {
\
char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE);
\
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index ef65b405853..026e4da29b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -36,7 +36,6 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TParquetCompressionType;
@@ -302,11 +301,8 @@ public class OutFileClause {
break;
case HLL:
case BITMAP:
- if (!(ConnectContext.get() != null && ConnectContext.get()
- .getSessionVariable().isReturnObjectDataAsBinary())) {
- break;
- }
- orcType = "string";
+ case QUANTILE_STATE:
+ orcType = "binary";
break;
case DATEV2:
orcType = "date";
@@ -327,6 +323,8 @@ public class OutFileClause {
case DATE:
case DATETIME:
case IPV6:
+ case VARIANT:
+ case JSONB:
orcType = "string";
break;
case DECIMALV2:
@@ -445,6 +443,8 @@ public class OutFileClause {
case DATE:
case DATETIME:
case IPV6:
+ case VARIANT:
+ case JSONB:
checkOrcType(schema.second, "string", true,
resultType.getPrimitiveType().toString());
break;
case DECIMAL32:
@@ -455,13 +455,8 @@ public class OutFileClause {
break;
case HLL:
case BITMAP:
- if (ConnectContext.get() != null && ConnectContext.get()
-
.getSessionVariable().isReturnObjectDataAsBinary()) {
- checkOrcType(schema.second, "string", true,
resultType.getPrimitiveType().toString());
- } else {
- throw new AnalysisException("Orc format does not
support column type: "
- + resultType.getPrimitiveType());
- }
+ case QUANTILE_STATE:
+ checkOrcType(schema.second, "binary", true,
resultType.getPrimitiveType().toString());
break;
case STRUCT:
checkOrcType(schema.second, "struct", false,
resultType.getPrimitiveType().toString());
diff --git
a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out
b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out
new file mode 100644
index 00000000000..cd6f000b6c5
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_load_parquet --
+20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6
+20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F
+20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329
+20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748
+20220201 4 00000045010000000000000040 010114CAA737BD54146E
+20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C
+
+-- !select_load_orc --
+20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6
+20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F
+20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329
+20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748
+20220201 4 00000045010000000000000040 010114CAA737BD54146E
+20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C
+
+-- !select_load_csv --
+20220201 0 \N \N \N
+20220201 1 \N \N \N
+20220201 2 \N \N \N
+20220201 3 \N \N \N
+20220201 4 \N \N \N
+20220201 5 \N \N \N
+
diff --git
a/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out
b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out
new file mode 100644
index 00000000000..d2583093964
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_load_parquet --
+20220201 0 {"k1":"100"} {"k1":"100"}
+20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
+20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
+20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
+20220201 4 {"k1":"100","doris":"nereids"}
{"doris":"nereids","k1":"100"}
+20220201 5 {"k1":"100","doris":"pipeline"}
{"doris":"pipeline","k1":"100"}
+
+-- !select_load_orc --
+20220201 0 {"k1":"100"} {"k1":"100"}
+20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
+20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
+20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
+20220201 4 {"k1":"100","doris":"nereids"}
{"doris":"nereids","k1":"100"}
+20220201 5 {"k1":"100","doris":"pipeline"}
{"doris":"pipeline","k1":"100"}
+
+-- !select_load_orc --
+20220201 0 {"k1":"100"} {"k1":"100"}
+20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
+20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
+20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
+20220201 4 {"k1":"100","doris":"nereids"}
{"doris":"nereids","k1":"100"}
+20220201 5 {"k1":"100","doris":"pipeline"}
{"doris":"pipeline","k1":"100"}
+
diff --git
a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy
b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy
new file mode 100644
index 00000000000..49f81732791
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_complex_type", "p0") {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def export_table_name = "test_outfile_complex_type_table"
+ def outFilePath = "${bucket}/outfile/complex_type/exp_"
+
+ def outfile_to_S3 = { format ->
+ // select ... into outfile ...
+ def res = sql """
+ SELECT * FROM ${export_table_name} t
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+
+ return res[0][3]
+ }
+
+ sql """ DROP TABLE IF EXISTS ${export_table_name} """
+ sql """
+ CREATE TABLE `${export_table_name}` (
+ `dt` int(11) NULL COMMENT "",
+ `id` int(11) NULL COMMENT "",
+ `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "",
+ `hll_t` hll hll_union,
+ `device_id` bitmap BITMAP_UNION
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`dt`, `id`)
+ DISTRIBUTED BY HASH(`dt`)
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${export_table_name}` values
+ (20220201,0, to_quantile_state(1, 2048), hll_hash(1), to_bitmap(243)),
+ (20220201,1, to_quantile_state(-1, 2048), hll_hash(2),
bitmap_from_array([1,2,3,4,5,434543])),
+ (20220201,2, to_quantile_state(0, 2048), hll_hash(3),
to_bitmap(1234566)),
+ (20220201,3, to_quantile_state(1, 2048), hll_hash(4),
to_bitmap(8888888888888)),
+ (20220201,4, to_quantile_state(2, 2048), hll_hash(5),
to_bitmap(98392819412234)),
+ (20220201,5, to_quantile_state(3, 2048), hll_hash(6),
to_bitmap(253234234));
+ """
+
+ // parquet file format
+ def format = "parquet"
+ def outfile_url = outfile_to_S3("${format}")
+ qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+
+ // orc file foramt
+ format = "orc"
+ outfile_url = outfile_to_S3("${format}")
+ qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+
+ // csv file foramt
+ format = "csv"
+ outfile_url = outfile_to_S3("${format}")
+ qt_select_load_csv """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+}
\ No newline at end of file
diff --git
a/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy
b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy
new file mode 100644
index 00000000000..ed3019436ae
--- /dev/null
+++
b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_jsonb_and_variant", "p0") {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def export_table_name = "test_outfile_jsonb_and_variant_table"
+ def outFilePath = "${bucket}/outfile/jsonb_and_variant/exp_"
+
+ def outfile_to_S3 = { format ->
+ // select ... into outfile ...
+ def res = sql """
+ SELECT * FROM ${export_table_name} t
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+
+ return res[0][3]
+ }
+
+ sql """ DROP TABLE IF EXISTS ${export_table_name} """
+ sql """
+ CREATE TABLE `${export_table_name}` (
+ `dt` int(11) NULL COMMENT "",
+ `id` int(11) NULL COMMENT "",
+ `json_col` JSON NULL COMMENT "",
+ `variant_col` variant NULL COMMENT ""
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`dt`)
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${export_table_name}` values
+ (20220201,0, '{"k1": "100"}', '{"k1": "100"}'),
+ (20220201,1, '{"k1": "100", "k2": "123"}', '{"k1": "100", "k2":
"123"}'),
+ (20220201,2, '{"k1": "100", "abc": "567"}', '{"k1": "100", "abc":
"567"}'),
+ (20220201,3, '{"k1": "100", "k3": 123}', '{"k1": "100", "k3": 123}'),
+ (20220201,4, '{"k1": "100", "doris": "nereids"}', '{"k1": "100",
"doris": "nereids"}'),
+ (20220201,5, '{"k1": "100", "doris": "pipeline"}', '{"k1": "100",
"doris": "pipeline"}');
+ """
+
+ // parquet file format
+ def format = "parquet"
+ def outfile_url = outfile_to_S3("${format}")
+ qt_select_load_parquet """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+
+ // orc file foramt
+ format = "orc"
+ outfile_url = outfile_to_S3("${format}")
+ qt_select_load_orc """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+
+ // orc file foramt
+ format = "csv"
+ outfile_url = outfile_to_S3("${format}")
+ qt_select_load_orc """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}"
+ );
+ """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]