This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c2fae109c3 [Improvement](outfile) Support output null in parquet
writer (#12970)
c2fae109c3 is described below
commit c2fae109c3f54dd28c6b8cf858200dadcf344949
Author: Gabriel <[email protected]>
AuthorDate: Thu Sep 29 13:36:30 2022 +0800
[Improvement](outfile) Support output null in parquet writer (#12970)
---
be/src/vec/olap/olap_data_convertor.cpp | 228 ++++++-------------
be/src/vec/olap/olap_data_convertor.h | 150 ++-----------
be/src/vec/runtime/vdatetime_value.h | 11 -
be/src/vec/runtime/vparquet_writer.cpp | 243 +++++++++++----------
.../org/apache/doris/analysis/OutFileClause.java | 33 +--
.../java/org/apache/doris/analysis/SelectStmt.java | 2 +-
.../apache/doris/analysis/SetOperationStmt.java | 2 +-
.../org/apache/doris/analysis/SelectStmtTest.java | 4 +-
.../data/export_p0/test_outfile_parquet.out | 25 +++
.../suites/export_p0/test_outfile_parquet.groovy | 158 ++++++++++++++
10 files changed, 425 insertions(+), 431 deletions(-)
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index 7a1f2cfbec..58ab5d6579 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -490,99 +490,49 @@ void
OlapBlockDataConvertor::OlapColumnDataConvertorDate::set_source_column(
const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t
num_rows) {
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint24_t>::set_source_column(
typed_column, row_pos, num_rows);
- if (is_date_v2(typed_column.type)) {
- from_date_v2_ = true;
- } else {
- from_date_v2_ = false;
- }
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() {
assert(_typed_column.column);
- if (from_date_v2_) {
- const vectorized::ColumnVector<vectorized::UInt32>* column_datetime =
nullptr;
- if (_nullmap) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::UInt32>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::UInt32>*>(
- _typed_column.column.get());
- }
+ const vectorized::ColumnVector<vectorized::Int64>* column_datetime =
nullptr;
+ if (_nullmap) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
+ column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
+ nullable_column->get_nested_column_ptr().get());
+ } else {
+ column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
+ _typed_column.column.get());
+ }
- assert(column_datetime);
-
- const DateV2Value<DateV2ValueType>* datetime_cur =
- (const
DateV2Value<DateV2ValueType>*)(column_datetime->get_data().data()) +
- _row_pos;
- const DateV2Value<DateV2ValueType>* datetime_end = datetime_cur +
_num_rows;
- uint24_t* value = _values.data();
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_olap_date();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
- value == _values.get_end_ptr());
- } else {
- while (datetime_cur != datetime_end) {
+ assert(column_datetime);
+
+ const VecDateTimeValue* datetime_cur =
+ (const VecDateTimeValue*)(column_datetime->get_data().data()) +
_row_pos;
+ const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
+ uint24_t* value = _values.data();
+ if (_nullmap) {
+ const UInt8* nullmap_cur = _nullmap + _row_pos;
+ while (datetime_cur != datetime_end) {
+ if (!*nullmap_cur) {
*value = datetime_cur->to_olap_date();
- ++value;
- ++datetime_cur;
+ } else {
+ // do nothing
}
- assert(value == _values.get_end_ptr());
+ ++value;
+ ++datetime_cur;
+ ++nullmap_cur;
}
- return Status::OK();
+ assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value ==
_values.get_end_ptr());
} else {
- const vectorized::ColumnVector<vectorized::Int64>* column_datetime =
nullptr;
- if (_nullmap) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- _typed_column.column.get());
- }
-
- assert(column_datetime);
-
- const VecDateTimeValue* datetime_cur =
- (const VecDateTimeValue*)(column_datetime->get_data().data())
+ _row_pos;
- const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
- uint24_t* value = _values.data();
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_olap_date();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
- value == _values.get_end_ptr());
- } else {
- while (datetime_cur != datetime_end) {
- *value = datetime_cur->to_olap_date();
- ++value;
- ++datetime_cur;
- }
- assert(value == _values.get_end_ptr());
+ while (datetime_cur != datetime_end) {
+ *value = datetime_cur->to_olap_date();
+ ++value;
+ ++datetime_cur;
}
- return Status::OK();
+ assert(value == _values.get_end_ptr());
}
+ return Status::OK();
}
// class OlapBlockDataConvertor::OlapColumnDataConvertorJsonb
@@ -660,99 +610,49 @@ void
OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::set_source_column(
const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t
num_rows) {
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint64_t>::set_source_column(
typed_column, row_pos, num_rows);
- if (is_date_v2_or_datetime_v2(typed_column.type)) {
- from_datetime_v2_ = true;
- } else {
- from_datetime_v2_ = false;
- }
}
Status
OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap() {
assert(_typed_column.column);
- if (from_datetime_v2_) {
- const vectorized::ColumnVector<vectorized::UInt64>* column_datetimev2
= nullptr;
- if (_nullmap) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
- column_datetimev2 = assert_cast<const
vectorized::ColumnVector<vectorized::UInt64>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetimev2 = assert_cast<const
vectorized::ColumnVector<vectorized::UInt64>*>(
- _typed_column.column.get());
- }
+ const vectorized::ColumnVector<vectorized::Int64>* column_datetime =
nullptr;
+ if (_nullmap) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
+ column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
+ nullable_column->get_nested_column_ptr().get());
+ } else {
+ column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
+ _typed_column.column.get());
+ }
- assert(column_datetimev2);
-
- const DateV2Value<DateTimeV2ValueType>* datetime_cur =
- (const
DateV2Value<DateTimeV2ValueType>*)(column_datetimev2->get_data().data()) +
- _row_pos;
- const DateV2Value<DateTimeV2ValueType>* datetime_end = datetime_cur +
_num_rows;
- uint64_t* value = _values.data();
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_olap_datetime();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
- value == _values.get_end_ptr());
- } else {
- while (datetime_cur != datetime_end) {
+ assert(column_datetime);
+
+ const VecDateTimeValue* datetime_cur =
+ (const VecDateTimeValue*)(column_datetime->get_data().data()) +
_row_pos;
+ const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
+ uint64_t* value = _values.data();
+ if (_nullmap) {
+ const UInt8* nullmap_cur = _nullmap + _row_pos;
+ while (datetime_cur != datetime_end) {
+ if (!*nullmap_cur) {
*value = datetime_cur->to_olap_datetime();
- ++value;
- ++datetime_cur;
+ } else {
+ // do nothing
}
- assert(value == _values.get_end_ptr());
+ ++value;
+ ++datetime_cur;
+ ++nullmap_cur;
}
- return Status::OK();
+ assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value ==
_values.get_end_ptr());
} else {
- const vectorized::ColumnVector<vectorized::Int64>* column_datetime =
nullptr;
- if (_nullmap) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetime = assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- _typed_column.column.get());
- }
-
- assert(column_datetime);
-
- const VecDateTimeValue* datetime_cur =
- (const VecDateTimeValue*)(column_datetime->get_data().data())
+ _row_pos;
- const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
- uint64_t* value = _values.data();
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_olap_datetime();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
- value == _values.get_end_ptr());
- } else {
- while (datetime_cur != datetime_end) {
- *value = datetime_cur->to_olap_datetime();
- ++value;
- ++datetime_cur;
- }
- assert(value == _values.get_end_ptr());
+ while (datetime_cur != datetime_end) {
+ *value = datetime_cur->to_olap_datetime();
+ ++value;
+ ++datetime_cur;
}
- return Status::OK();
+ assert(value == _values.get_end_ptr());
}
+ return Status::OK();
}
Status
OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap() {
diff --git a/be/src/vec/olap/olap_data_convertor.h
b/be/src/vec/olap/olap_data_convertor.h
index 961f60f4c5..9eb63c9154 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -209,9 +209,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column,
size_t row_pos,
size_t num_rows) override;
Status convert_to_olap() override;
-
- private:
- bool from_date_v2_;
};
class OlapColumnDataConvertorDateTime : public
OlapColumnDataConvertorPaddedPODArray<uint64_t> {
@@ -219,9 +216,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column,
size_t row_pos,
size_t num_rows) override;
Status convert_to_olap() override;
-
- private:
- bool from_datetime_v2_;
};
class OlapColumnDataConvertorDecimal
@@ -277,11 +271,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column,
size_t row_pos,
size_t num_rows) override {
OlapColumnDataConvertorBase::set_source_column(typed_column,
row_pos, num_rows);
- if (is_date(typed_column.type)) {
- from_date_to_date_v2_ = true;
- } else {
- from_date_to_date_v2_ = false;
- }
}
const void* get_data() const override { return values_; }
@@ -296,67 +285,24 @@ private:
}
Status convert_to_olap() override {
- if (UNLIKELY(from_date_to_date_v2_)) {
- const vectorized::ColumnVector<vectorized::Int64>*
column_datetime = nullptr;
- if (_nullmap) {
- auto nullable_column = assert_cast<const
vectorized::ColumnNullable*>(
- _typed_column.column.get());
- column_datetime =
- assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
-
nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetime =
- assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- _typed_column.column.get());
- }
-
- assert(column_datetime);
-
- const VecDateTimeValue* datetime_cur =
- (const
VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
- const VecDateTimeValue* datetime_end = datetime_cur +
_num_rows;
- uint32_t* value = const_cast<uint32_t*>(values_);
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_date_v2();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- } else {
- while (datetime_cur != datetime_end) {
- *value = datetime_cur->to_date_v2();
- ++value;
- ++datetime_cur;
- }
- }
- return Status::OK();
+ const vectorized::ColumnVector<uint32>* column_data = nullptr;
+ if (_nullmap) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
+ column_data = assert_cast<const
vectorized::ColumnVector<uint32>*>(
+ nullable_column->get_nested_column_ptr().get());
} else {
- const vectorized::ColumnVector<uint32>* column_data = nullptr;
- if (_nullmap) {
- auto nullable_column = assert_cast<const
vectorized::ColumnNullable*>(
- _typed_column.column.get());
- column_data = assert_cast<const
vectorized::ColumnVector<uint32>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_data = assert_cast<const
vectorized::ColumnVector<uint32>*>(
- _typed_column.column.get());
- }
-
- assert(column_data);
- values_ = (const uint32*)(column_data->get_data().data()) +
_row_pos;
- return Status::OK();
+ column_data = assert_cast<const
vectorized::ColumnVector<uint32>*>(
+ _typed_column.column.get());
}
+
+ assert(column_data);
+ values_ = (const uint32*)(column_data->get_data().data()) +
_row_pos;
+ return Status::OK();
}
private:
const uint32_t* values_ = nullptr;
- bool from_date_to_date_v2_;
};
class OlapColumnDataConvertorDateTimeV2 : public
OlapColumnDataConvertorBase {
@@ -367,11 +313,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column,
size_t row_pos,
size_t num_rows) override {
OlapColumnDataConvertorBase::set_source_column(typed_column,
row_pos, num_rows);
- if (is_date_or_datetime(typed_column.type)) {
- from_datetime_to_datetime_v2_ = true;
- } else {
- from_datetime_to_datetime_v2_ = false;
- }
}
const void* get_data() const override { return values_; }
@@ -386,67 +327,24 @@ private:
}
Status convert_to_olap() override {
- if (UNLIKELY(from_datetime_to_datetime_v2_)) {
- const vectorized::ColumnVector<vectorized::Int64>*
column_datetime = nullptr;
- if (_nullmap) {
- auto nullable_column = assert_cast<const
vectorized::ColumnNullable*>(
- _typed_column.column.get());
- column_datetime =
- assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
-
nullable_column->get_nested_column_ptr().get());
- } else {
- column_datetime =
- assert_cast<const
vectorized::ColumnVector<vectorized::Int64>*>(
- _typed_column.column.get());
- }
-
- assert(column_datetime);
-
- const VecDateTimeValue* datetime_cur =
- (const
VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
- const VecDateTimeValue* datetime_end = datetime_cur +
_num_rows;
- uint64_t* value = const_cast<uint64_t*>(values_);
- if (_nullmap) {
- const UInt8* nullmap_cur = _nullmap + _row_pos;
- while (datetime_cur != datetime_end) {
- if (!*nullmap_cur) {
- *value = datetime_cur->to_datetime_v2();
- } else {
- // do nothing
- }
- ++value;
- ++datetime_cur;
- ++nullmap_cur;
- }
- } else {
- while (datetime_cur != datetime_end) {
- *value = datetime_cur->to_datetime_v2();
- ++value;
- ++datetime_cur;
- }
- }
- return Status::OK();
+ const vectorized::ColumnVector<uint64_t>* column_data = nullptr;
+ if (_nullmap) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
+ column_data = assert_cast<const
vectorized::ColumnVector<uint64_t>*>(
+ nullable_column->get_nested_column_ptr().get());
} else {
- const vectorized::ColumnVector<uint64_t>* column_data =
nullptr;
- if (_nullmap) {
- auto nullable_column = assert_cast<const
vectorized::ColumnNullable*>(
- _typed_column.column.get());
- column_data = assert_cast<const
vectorized::ColumnVector<uint64_t>*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_data = assert_cast<const
vectorized::ColumnVector<uint64_t>*>(
- _typed_column.column.get());
- }
-
- assert(column_data);
- values_ = (const uint64_t*)(column_data->get_data().data()) +
_row_pos;
- return Status::OK();
+ column_data = assert_cast<const
vectorized::ColumnVector<uint64_t>*>(
+ _typed_column.column.get());
}
+
+ assert(column_data);
+ values_ = (const uint64_t*)(column_data->get_data().data()) +
_row_pos;
+ return Status::OK();
}
private:
const uint64_t* values_ = nullptr;
- bool from_datetime_to_datetime_v2_;
};
// decimalv3 don't need to do any convert
diff --git a/be/src/vec/runtime/vdatetime_value.h
b/be/src/vec/runtime/vdatetime_value.h
index 42ce3a173c..7af0c273b1 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -781,17 +781,6 @@ public:
return val;
}
- uint64_t to_olap_datetime() const {
- uint64_t date_val =
- date_v2_value_.year_ * 10000 + date_v2_value_.month_ * 100 +
date_v2_value_.day_;
- uint64_t time_val = 0;
- if constexpr (is_datetime) {
- time_val = date_v2_value_.hour_ * 10000 + date_v2_value_.minute_ *
100 +
- date_v2_value_.second_;
- }
- return date_val * 1000000 + time_val;
- }
-
bool to_format_string(const char* format, int len, char* to) const;
bool from_date_format_str(const char* format, int format_len, const char*
value,
diff --git a/be/src/vec/runtime/vparquet_writer.cpp
b/be/src/vec/runtime/vparquet_writer.cpp
index 14f90fc63e..9737fba854 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -89,20 +89,17 @@ void VParquetWriterWrapper::parse_schema(const
std::vector<TParquetSchema>& parq
#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE)
\
parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
parquet::WRITER* col_writer =
static_cast<parquet::WRITER*>(rgWriter->column(i)); \
- __int128 default_value = 0;
\
if (null_map != nullptr) {
\
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) {
\
- col_writer->WriteBatch(1, nullptr, nullptr,
\
- (*null_map)[row_id] != 0
\
- ? reinterpret_cast<const
NATIVE_TYPE*>(&default_value) \
- : reinterpret_cast<const
NATIVE_TYPE*>( \
- assert_cast<const
COLUMN_TYPE&>(*col) \
-
.get_data_at(row_id) \
- .data));
\
+ def_level[row_id] = null_data[row_id] == 0;
\
}
\
+ col_writer->WriteBatch(sz, def_level.data(), nullptr,
\
+ reinterpret_cast<const NATIVE_TYPE*>(
\
+ assert_cast<const
COLUMN_TYPE&>(*col).get_data().data())); \
} else if (const auto* not_nullable_column = check_and_get_column<const
COLUMN_TYPE>(col)) { \
col_writer->WriteBatch(
\
- sz, nullptr, nullptr,
\
+ sz, nullable ? def_level.data() : nullptr, nullptr,
\
reinterpret_cast<const
NATIVE_TYPE*>(not_nullable_column->get_data().data())); \
} else {
\
RETURN_WRONG_TYPE
\
@@ -117,14 +114,17 @@ void VParquetWriterWrapper::parse_schema(const
std::vector<TParquetSchema>& parq
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
\
DCHECK(decimal_type);
\
if (null_map != nullptr) {
\
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) {
\
- if ((*null_map)[row_id] != 0) {
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ if (null_data[row_id] != 0) {
\
+ single_def_level = 0;
\
+ col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
+ single_def_level = 1;
\
} else {
\
auto s = decimal_type->to_string(*col, row_id);
\
value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
value.len = s.size();
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
}
\
}
\
} else {
\
@@ -132,7 +132,7 @@ void VParquetWriterWrapper::parse_schema(const
std::vector<TParquetSchema>& parq
auto s = decimal_type->to_string(*col, row_id);
\
value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
value.len = s.size();
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr,
nullptr, &value); \
}
\
}
@@ -141,16 +141,19 @@ void VParquetWriterWrapper::parse_schema(const
std::vector<TParquetSchema>& parq
parquet::ByteArrayWriter* col_writer =
\
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
\
if (null_map != nullptr) {
\
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) {
\
- if ((*null_map)[row_id] != 0) {
\
+ if (null_data[row_id] != 0) {
\
+ single_def_level = 0;
\
parquet::ByteArray value;
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
+ single_def_level = 1;
\
} else {
\
const auto& tmp = col->get_data_at(row_id);
\
parquet::ByteArray value;
\
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);
\
value.len = tmp.size;
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
}
\
}
\
} else if (const auto* not_nullable_column = check_and_get_column<const
COLUMN_TYPE>(col)) { \
@@ -159,7 +162,7 @@ void VParquetWriterWrapper::parse_schema(const
std::vector<TParquetSchema>& parq
parquet::ByteArray value;
\
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);
\
value.len = tmp.size;
\
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr,
nullptr, &value); \
}
\
} else {
\
RETURN_WRONG_TYPE
\
@@ -173,22 +176,25 @@ Status VParquetWriterWrapper::write(const Block& block) {
try {
for (size_t i = 0; i < block.columns(); i++) {
auto& raw_column = block.get_by_position(i).column;
- const auto col = raw_column->is_nullable()
- ? reinterpret_cast<const ColumnNullable*>(
-
block.get_by_position(i).column.get())
- ->get_nested_column_ptr()
- .get()
- : block.get_by_position(i).column.get();
- auto null_map =
- raw_column->is_nullable() && reinterpret_cast<const
ColumnNullable*>(
-
block.get_by_position(i).column.get())
-
->get_null_map_column_ptr()
- ->has_null()
- ? reinterpret_cast<const ColumnNullable*>(
- block.get_by_position(i).column.get())
- ->get_null_map_column_ptr()
- : nullptr;
+ auto nullable = raw_column->is_nullable();
+ const auto col = nullable ? reinterpret_cast<const
ColumnNullable*>(
+
block.get_by_position(i).column.get())
+ ->get_nested_column_ptr()
+ .get()
+ : block.get_by_position(i).column.get();
+ auto null_map = nullable && reinterpret_cast<const
ColumnNullable*>(
+
block.get_by_position(i).column.get())
+ ->has_null()
+ ? reinterpret_cast<const ColumnNullable*>(
+
block.get_by_position(i).column.get())
+ ->get_null_map_column_ptr()
+ : nullptr;
auto& type = block.get_by_position(i).type;
+
+ std::vector<int16_t> def_level(sz);
+ // For scalar type, definition level == 1 means this value is not
NULL.
+ std::fill(def_level.begin(), def_level.end(), 1);
+ int16_t single_def_level = 1;
switch (_output_vexpr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN: {
DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter,
ColumnVector<UInt8>, bool)
@@ -210,63 +216,49 @@ Status VParquetWriterWrapper::write(const Block& block) {
break;
}
case TYPE_TINYINT:
- case TYPE_SMALLINT:
- case TYPE_INT: {
+ case TYPE_SMALLINT: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int32Writer* col_writer =
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
- int32_t default_int32 = 0;
if (null_map != nullptr) {
- if (const auto* nested_column =
- check_and_get_column<const
ColumnVector<Int32>>(col)) {
- for (size_t row_id = 0; row_id < sz; row_id++) {
- col_writer->WriteBatch(
- 1, nullptr, nullptr,
- (*null_map)[row_id] != 0
- ? &default_int32
- : reinterpret_cast<const int32_t*>(
-
nested_column->get_data_at(row_id).data));
- }
- } else if (const auto* int16_column =
- check_and_get_column<const
ColumnVector<Int16>>(col)) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
+ if (const auto* int16_column =
+ check_and_get_column<const
ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ }
const int32_t tmp =
int16_column->get_data()[row_id];
- col_writer->WriteBatch(
- 1, nullptr, nullptr,
- (*null_map)[row_id] != 0
- ? &default_int32
- : reinterpret_cast<const
int32_t*>(&tmp));
+ col_writer->WriteBatch(1, &single_def_level,
nullptr,
+ reinterpret_cast<const
int32_t*>(&tmp));
+ single_def_level = 1;
}
} else if (const auto* int8_column =
check_and_get_column<const
ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ }
const int32_t tmp =
int8_column->get_data()[row_id];
- col_writer->WriteBatch(
- 1, nullptr, nullptr,
- (*null_map)[row_id] != 0
- ? &default_int32
- : reinterpret_cast<const
int32_t*>(&tmp));
+ col_writer->WriteBatch(1, &single_def_level,
nullptr,
+ reinterpret_cast<const
int32_t*>(&tmp));
+ single_def_level = 1;
}
} else {
RETURN_WRONG_TYPE
}
- } else if (const auto* not_nullable_column =
- check_and_get_column<const
ColumnVector<Int32>>(col)) {
- col_writer->WriteBatch(sz, nullptr, nullptr,
- reinterpret_cast<const int32_t*>(
-
not_nullable_column->get_data().data()));
} else if (const auto& int16_column =
check_and_get_column<const
ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int16_column->get_data()[row_id];
- col_writer->WriteBatch(1, nullptr, nullptr,
+ col_writer->WriteBatch(1, nullable ? def_level.data()
: nullptr, nullptr,
reinterpret_cast<const
int32_t*>(&tmp));
}
} else if (const auto& int8_column =
check_and_get_column<const
ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int8_column->get_data()[row_id];
- col_writer->WriteBatch(1, nullptr, nullptr,
+ col_writer->WriteBatch(1, nullable ? def_level.data()
: nullptr, nullptr,
reinterpret_cast<const
int32_t*>(&tmp));
}
} else {
@@ -274,25 +266,34 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
break;
}
+ case TYPE_INT: {
+ DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer,
ColumnVector<Int32>, Int32)
+ break;
+ }
case TYPE_DATETIME:
case TYPE_DATE: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
- int64_t default_int64 = 0;
+ uint64_t default_int64 = 0;
if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
- if ((*null_map)[row_id] != 0) {
- col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ def_level[row_id] = null_data[row_id] == 0;
+ }
+ uint64_t tmp_data[sz];
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ tmp_data[row_id] = default_int64;
} else {
- const auto tmp = binary_cast<Int64,
VecDateTimeValue>(
- assert_cast<const
ColumnVector<Int64>&>(*col)
-
.get_data()[row_id])
- .to_olap_datetime();
- col_writer->WriteBatch(1, nullptr, nullptr,
- reinterpret_cast<const
int64_t*>(&tmp));
+ tmp_data[row_id] = binary_cast<Int64,
VecDateTimeValue>(
+ assert_cast<const
ColumnVector<Int64>&>(*col)
+
.get_data()[row_id])
+ .to_olap_datetime();
}
}
+ col_writer->WriteBatch(sz, def_level.data(), nullptr,
+ reinterpret_cast<const
int64_t*>(tmp_data));
} else if (const auto* not_nullable_column =
check_and_get_column<const
ColumnVector<Int64>>(col)) {
std::vector<uint64_t> res(sz);
@@ -301,7 +302,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
}
- col_writer->WriteBatch(sz, nullptr, nullptr,
+ col_writer->WriteBatch(sz, nullable ? def_level.data() :
nullptr, nullptr,
reinterpret_cast<const
int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
@@ -310,32 +311,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
case TYPE_DATEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
- parquet::Int64Writer* col_writer =
-
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
- int64_t default_int64 = 0;
+ parquet::ByteArrayWriter* col_writer =
+
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+ parquet::ByteArray value;
if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
- if ((*null_map)[row_id] != 0) {
- col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
} else {
- uint64_t tmp = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
- assert_cast<const
ColumnVector<UInt32>&>(*col)
- .get_data()[row_id])
- .to_olap_datetime();
- col_writer->WriteBatch(1, nullptr, nullptr,
- reinterpret_cast<const
int64_t*>(&tmp));
+ char buffer[30];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const
uint8_t*>(buffer);
+ value.len = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
+ assert_cast<const
ColumnVector<UInt32>&>(*col)
+ .get_data()[row_id])
+ .to_buffer(buffer,
output_scale);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const
ColumnVector<UInt32>>(col)) {
- std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
- res[row_id] = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
-
not_nullable_column->get_data()[row_id])
- .to_olap_datetime();
+ char buffer[30];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const uint8_t*>(buffer);
+ value.len = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
+
not_nullable_column->get_data()[row_id])
+ .to_buffer(buffer, output_scale);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
}
- col_writer->WriteBatch(sz, nullptr, nullptr,
- reinterpret_cast<const
int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
@@ -343,32 +351,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
case TYPE_DATETIMEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
- parquet::Int64Writer* col_writer =
-
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
- int64_t default_int64 = 0;
+ parquet::ByteArrayWriter* col_writer =
+
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+ parquet::ByteArray value;
if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
- if ((*null_map)[row_id] != 0) {
- col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
} else {
- uint64_t tmp = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
- assert_cast<const
ColumnVector<UInt64>&>(*col)
- .get_data()[row_id])
- .to_olap_datetime();
- col_writer->WriteBatch(1, nullptr, nullptr,
- reinterpret_cast<const
int64_t*>(&tmp));
+ char buffer[30];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const
uint8_t*>(buffer);
+ value.len = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
+ assert_cast<const
ColumnVector<UInt64>&>(*col)
+ .get_data()[row_id])
+ .to_buffer(buffer,
output_scale);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const
ColumnVector<UInt64>>(col)) {
- std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
- res[row_id] = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
-
not_nullable_column->get_data()[row_id])
- .to_olap_datetime();
+ char buffer[30];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const uint8_t*>(buffer);
+ value.len = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
+
not_nullable_column->get_data()[row_id])
+ .to_buffer(buffer, output_scale);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
}
- col_writer->WriteBatch(sz, nullptr, nullptr,
- reinterpret_cast<const
int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
@@ -402,9 +417,12 @@ Status VParquetWriterWrapper::write(const Block& block) {
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
parquet::ByteArray value;
if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
- if ((*null_map)[row_id] != 0) {
- col_writer->WriteBatch(1, nullptr, nullptr,
&value);
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
} else {
const DecimalV2Value
decimal_val(reinterpret_cast<const PackedInt128*>(
col->get_data_at(row_id).data)
@@ -413,7 +431,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const
uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer,
output_scale);
- col_writer->WriteBatch(1, nullptr, nullptr,
&value);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
@@ -427,7 +445,8 @@ Status VParquetWriterWrapper::write(const Block& block) {
int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const
uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer,
output_scale);
- col_writer->WriteBatch(1, nullptr, nullptr, &value);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
}
} else {
RETURN_WRONG_TYPE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 64f7ec6737..fdb88b1ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -190,7 +190,7 @@ public class OutFileClause {
return parquetSchemas;
}
- public void analyze(Analyzer analyzer, List<Expr> resultExprs) throws
UserException {
+ public void analyze(Analyzer analyzer, List<Expr> resultExprs,
List<String> colLabels) throws UserException {
if (isAnalyzed) {
// If the query stmt is rewritten, the whole stmt will be analyzed
again.
// But some of fields in this OutfileClause has been changed,
@@ -229,13 +229,13 @@ public class OutFileClause {
isAnalyzed = true;
if (isParquetFormat()) {
- analyzeForParquetFormat(resultExprs);
+ analyzeForParquetFormat(resultExprs, colLabels);
}
}
- private void analyzeForParquetFormat(List<Expr> resultExprs) throws
AnalysisException {
+ private void analyzeForParquetFormat(List<Expr> resultExprs, List<String>
colLabels) throws AnalysisException {
if (this.parquetSchemas.isEmpty()) {
- genParquetSchema(resultExprs);
+ genParquetSchema(resultExprs, colLabels);
}
// check schema number
@@ -265,10 +265,8 @@ public class OutFileClause {
case BIGINT:
case DATE:
case DATETIME:
- case DATETIMEV2:
- case DATEV2:
if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) {
- throw new AnalysisException("project field type is
BIGINT/DATE/DATETIME/DATEV2/DATETIMEV2,"
+ throw new AnalysisException("project field type is
BIGINT/DATE/DATETIME,"
+ "should use int64, but the definition type
of column " + i + " is " + type);
}
break;
@@ -291,9 +289,12 @@ public class OutFileClause {
case DECIMAL64:
case DECIMAL128:
case DECIMALV2:
+ case DATETIMEV2:
+ case DATEV2:
if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type))
{
- throw new AnalysisException("project field type is
CHAR/VARCHAR/STRING/DECIMAL,"
- + " should use byte_array, but the definition
type of column " + i + " is " + type);
+ throw new AnalysisException("project field type is
CHAR/VARCHAR/STRING/DECIMAL/DATEV2"
+ + "/DATETIMEV2, should use byte_array, but the
definition type of column "
+ + i + " is " + type);
}
break;
case HLL:
@@ -316,12 +317,16 @@ public class OutFileClause {
}
}
- private void genParquetSchema(List<Expr> resultExprs) throws
AnalysisException {
+ private void genParquetSchema(List<Expr> resultExprs, List<String>
colLabels) throws AnalysisException {
Preconditions.checkState(this.parquetSchemas.isEmpty());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
TParquetSchema parquetSchema = new TParquetSchema();
- parquetSchema.schema_repetition_type =
PARQUET_REPETITION_TYPE_MAP.get("required");
+ if (resultExprs.get(i).isNullable()) {
+ parquetSchema.schema_repetition_type =
PARQUET_REPETITION_TYPE_MAP.get("optional");
+ } else {
+ parquetSchema.schema_repetition_type =
PARQUET_REPETITION_TYPE_MAP.get("required");
+ }
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get("boolean");
@@ -334,8 +339,6 @@ public class OutFileClause {
case BIGINT:
case DATE:
case DATETIME:
- case DATETIMEV2:
- case DATEV2:
parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get("int64");
break;
case FLOAT:
@@ -351,6 +354,8 @@ public class OutFileClause {
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
+ case DATETIMEV2:
+ case DATEV2:
parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get("byte_array");
break;
case HLL:
@@ -364,7 +369,7 @@ public class OutFileClause {
throw new AnalysisException("currently parquet do not
support column type: "
+ expr.getType().getPrimitiveType());
}
- parquetSchema.schema_column_name = "col" + i;
+ parquetSchema.schema_column_name = colLabels.get(i);
parquetSchemas.add(parquetSchema);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index e29a68d375..5645219af7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -559,7 +559,7 @@ public class SelectStmt extends QueryStmt {
}
}
if (hasOutFileClause()) {
- outFileClause.analyze(analyzer, resultExprs);
+ outFileClause.analyze(analyzer, resultExprs, colLabels);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
index b3ef8269e6..c8cb2a8ccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
@@ -303,7 +303,7 @@ public class SetOperationStmt extends QueryStmt {
baseTblResultExprs = resultExprs;
if (hasOutFileClause()) {
- outFileClause.analyze(analyzer, resultExprs);
+ outFileClause.analyze(analyzer, resultExprs, getColLabels());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
index 6932bbcaab..4945a59690 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
@@ -640,11 +640,11 @@ public class SelectStmtTest {
try {
SelectStmt stmt = (SelectStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
Assert.assertEquals(1,
stmt.getOutFileClause().getParquetSchemas().size());
-
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("required"),
+
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("optional"),
stmt.getOutFileClause().getParquetSchemas().get(0).schema_repetition_type);
Assert.assertEquals(stmt.getOutFileClause().PARQUET_DATA_TYPE_MAP.get("byte_array"),
stmt.getOutFileClause().getParquetSchemas().get(0).schema_data_type);
- Assert.assertEquals("col0",
stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
+ Assert.assertEquals("k1",
stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
diff --git a/regression-test/data/export_p0/test_outfile_parquet.out
b/regression-test/data/export_p0/test_outfile_parquet.out
new file mode 100644
index 0000000000..cb6eab3268
--- /dev/null
+++ b/regression-test/data/export_p0/test_outfile_parquet.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+1 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 1 1 true 1 1 1 1.1 1.1 char1
1
+2 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 2 2 true 2 2 2 2.2 2.2 char2
2
+3 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 3 3 true 3 3 3 3.3 3.3 char3
3
+4 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 4 4 true 4 4 4 4.4 4.4 char4
4
+5 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 5 5 true 5 5 5 5.5 5.5 char5
5
+6 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 6 6 true 6 6 6 6.6 6.6 char6
6
+7 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 7 7 true 7 7 7 7.7 7.7 char7
7
+8 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 8 8 true 8 8 8 8.8 8.8 char8
8
+9 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 9 9 true 9 9 9 9.9 9.9 char9
9
+10 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
\N \N \N \N \N \N \N \N \N \N
\N
+
+-- !select_default --
+1 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 1 1 true 1 1 1 1.1 1.1 char1
1
+2 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 2 2 true 2 2 2 2.2 2.2 char2
2
+3 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 3 3 true 3 3 3 3.3 3.3 char3
3
+4 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 4 4 true 4 4 4 4.4 4.4 char4
4
+5 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 5 5 true 5 5 5 5.5 5.5 char5
5
+6 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 6 6 true 6 6 6 6.6 6.6 char6
6
+7 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 7 7 true 7 7 7 7.7 7.7 char7
7
+8 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 8 8 true 8 8 8 8.8 8.8 char8
8
+9 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
Beijing 9 9 true 9 9 9 9.9 9.9 char9
9
+10 2017-10-01 2017-10-01T00:00 2017-10-01
2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111
\N \N \N \N \N \N \N \N \N \N
\N
+
diff --git a/regression-test/suites/export_p0/test_outfile_parquet.groovy
b/regression-test/suites/export_p0/test_outfile_parquet.groovy
new file mode 100644
index 0000000000..2804b1e0e5
--- /dev/null
+++ b/regression-test/suites/export_p0/test_outfile_parquet.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_parquet") {
+ def dbName = "test_query_db"
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "USE $dbName"
+ StringBuilder strBuilder = new StringBuilder()
+ strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
+ strBuilder.append(" http://" + context.config.feHttpAddress +
"/rest/v1/config/fe")
+
+ String command = strBuilder.toString()
+ def process = command.toString().execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ def out = process.getText()
+ logger.info("Request FE Config: code=" + code + ", out=" + out + ", err="
+ err)
+ assertEquals(code, 0)
+ def response = parseJson(out.trim())
+ assertEquals(response.code, 0)
+ assertEquals(response.msg, "success")
+ def configJson = response.data.rows
+ boolean enableOutfileToLocal = false
+ for (Object conf: configJson) {
+ assert conf instanceof Map
+ if (((Map<String, String>) conf).get("Name").toLowerCase() ==
"enable_outfile_to_local") {
+ enableOutfileToLocal = ((Map<String, String>)
conf).get("Value").toLowerCase() == "true"
+ }
+ }
+ if (!enableOutfileToLocal) {
+ logger.warn("Please set enable_outfile_to_local to true to run
test_outfile")
+ return
+ }
+ def tableName = "outfile_parquet_test"
+ def tableName2 = "outfile_parquet_test2"
+ def outFilePath = """${context.file.parent}/tmp"""
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `user_id` INT NOT NULL COMMENT "用户id",
+ `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+ `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+ `date_1` DATEV2 NOT NULL COMMENT "",
+ `datetime_1` DATETIMEV2 NOT NULL COMMENT "",
+ `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
+ `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
+ `city` VARCHAR(20) COMMENT "用户所在城市",
+ `age` SMALLINT COMMENT "用户年龄",
+ `sex` TINYINT COMMENT "用户性别",
+ `bool_col` boolean COMMENT "",
+ `int_col` int COMMENT "",
+ `bigint_col` bigint COMMENT "",
+ `largeint_col` int COMMENT "",
+ `float_col` float COMMENT "",
+ `double_col` double COMMENT "",
+ `char_col` CHAR(10) COMMENT "",
+ `decimal_col` decimal COMMENT ""
+ )
+ DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+ """
+ StringBuilder sb = new StringBuilder()
+ int i = 1
+ for (; i < 10; i ++) {
+ sb.append("""
+ (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01',
'2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01
00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i},
${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
+ """)
+ }
+ sb.append("""
+ (${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01',
'2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01
00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
NULL)
+ """)
+ sql """ INSERT INTO ${tableName} VALUES
+ ${sb.toString()}
+ """
+ qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
+
+ // check outfile
+ File path = new File(outFilePath)
+ if (!path.exists()) {
+ assert path.mkdirs()
+ } else {
+ throw new IllegalStateException("""${outFilePath} already exists!
""")
+ }
+ sql """
+ SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE
"file://${outFilePath}/" FORMAT AS PARQUET;
+ """
+
+ File[] files = path.listFiles()
+ assert files.length == 1
+
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ `user_id` INT NOT NULL COMMENT "用户id",
+ `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+ `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+ `date_1` DATEV2 NOT NULL COMMENT "",
+ `datetime_1` DATETIMEV2 NOT NULL COMMENT "",
+ `datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
+ `datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
+ `city` VARCHAR(20) COMMENT "用户所在城市",
+ `age` SMALLINT COMMENT "用户年龄",
+ `sex` TINYINT COMMENT "用户性别",
+ `bool_col` boolean COMMENT "",
+ `int_col` int COMMENT "",
+ `bigint_col` bigint COMMENT "",
+ `largeint_col` int COMMENT "",
+ `float_col` float COMMENT "",
+ `double_col` double COMMENT "",
+ `char_col` CHAR(10) COMMENT "",
+ `decimal_col` decimal COMMENT ""
+ )
+ DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+ """
+
+ StringBuilder commandBuilder = new StringBuilder()
+ commandBuilder.append("""curl -v --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}""")
+ commandBuilder.append(""" -H format:parquet -T """ +
files[0].getAbsolutePath() + """ http://${context.config.feHttpAddress}/api/"""
+ dbName + "/" + tableName2 + "/_stream_load")
+ command = commandBuilder.toString()
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())))
+ out = process.getText()
+ logger.info("Run command: command=" + command + ",code=" + code + ",
out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ qt_select_default """ SELECT * FROM ${tableName2} t ORDER BY user_id;
"""
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+ try_sql("DROP TABLE IF EXISTS ${tableName2}")
+ File path = new File(outFilePath)
+ if (path.exists()) {
+ for (File f: path.listFiles()) {
+ f.delete();
+ }
+ path.delete();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]