This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a676ae4275d [fix](parquet) Don't decompress dict page when dict page
is empty (#60374)
a676ae4275d is described below
commit a676ae4275d7959a41775f0c3733ba4ef9d6cdc2
Author: Lijia Liu <[email protected]>
AuthorDate: Fri Mar 13 01:41:58 2026 +0800
[fix](parquet) Don't decompress dict page when dict page is empty (#60374)
When a string column's data are all null, the dict page may be empty.
The error message is as follows:
```
INTERNAL_ERROR]Read parquet file hdfs://xxx.0.parq failed, reason =
[INVALID_ARGUMENT]ZSTD_decompressDCtx error: Unknown frame descriptor. cur
path: [xxx](hdfs://xxx.0.parq)
```
We needn't decompress dcit page data when dict page is empty and just
cache empty data as decompressed data.
Co-authored-by: liutang123 <[email protected]>
---
be/src/common/status.h | 11 +
be/src/format/parquet/byte_array_dict_decoder.cpp | 12 +-
be/src/format/parquet/byte_array_dict_decoder.h | 3 +-
be/src/format/parquet/decoder.h | 3 +-
be/src/format/parquet/fix_length_dict_decoder.hpp | 13 +-
.../parquet/vparquet_column_chunk_reader.cpp | 20 +-
.../format/parquet/vparquet_column_chunk_reader.h | 2 +-
be/src/format/parquet/vparquet_column_reader.cpp | 2 +-
be/src/format/parquet/vparquet_column_reader.h | 6 +-
be/src/format/parquet/vparquet_group_reader.cpp | 23 +-
be/src/format/parquet/vparquet_group_reader.h | 2 +-
be/src/format/parquet/vparquet_page_reader.h | 1 +
.../parquet_scanner/test_string_null.zst.parquet | Bin 0 -> 925 bytes
.../byte_array_dict_decoder_empty_dict_test.cpp | 53 ++
.../parquet/byte_array_dict_decoder_test.cpp | 4 +-
.../fix_length_dict_decoder_empty_dict_test.cpp | 59 +++
.../parquet/fix_length_dict_decoder_test.cpp | 4 +-
be/test/format/parquet/parquet_reader_test.cpp | 542 ++++++++++++++++++++-
18 files changed, 731 insertions(+), 29 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index e074e3a8ed0..412bf0be019 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -14,6 +14,7 @@
#include <memory>
#include <string>
#include <string_view>
+#include <type_traits>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -757,6 +758,16 @@ using ResultError = unexpected<Status>;
std::forward<T>(res).value();
\
})
+#define TEST_RESULT_ERROR(stmt)
\
+ ({
\
+ auto&& _result_ = (stmt);
\
+ using _result_t = std::decay_t<decltype(_result_)>;
\
+ if (_result_.has_value()) [[unlikely]] {
\
+ ASSERT_FALSE(_result_.has_value()) << "Expected ResultError, but
got success result"; \
+ }
\
+ std::forward<_result_t>(_result_).error();
\
+ })
+
// core in Debug mode, exception in Release mode.
#define DORIS_CHECK(stmt)
\
do {
\
diff --git a/be/src/format/parquet/byte_array_dict_decoder.cpp
b/be/src/format/parquet/byte_array_dict_decoder.cpp
index 9b685a075e2..f1678e151e4 100644
--- a/be/src/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/format/parquet/byte_array_dict_decoder.cpp
@@ -78,9 +78,17 @@ Status
ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_
return Status::OK();
}
-MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
+Result<MutableColumnPtr>
ByteArrayDictDecoder::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
auto res = ColumnString::create();
+ if (_dict_items.empty()) {
+ if (dict_column->size() > 0) {
+ LOG(ERROR) << "Attempt to convert dict column with empty
dictionary, column size: "
+ << dict_column->size();
+ return ResultError(Status::IOError("empty dictionary"));
+ }
+ return res;
+ }
std::vector<StringRef> dict_values(dict_column->size());
const auto& data = dict_column->get_data();
for (size_t i = 0; i < dict_column->size(); ++i) {
@@ -106,7 +114,7 @@ Status
ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
if (doris_column->is_column_dictionary()) {
ColumnDictI32& dict_column =
assert_cast<ColumnDictI32&>(*doris_column);
- if (dict_column.dict_size() == 0) {
+ if (dict_column.dict_size() == 0 && !_dict_items.empty()) {
//If the dictionary grows too big, whether in size or number of
distinct values,
// the encoding will fall back to the plain encoding.
dict_column.insert_many_dict_data(_dict_items.data(),
diff --git a/be/src/format/parquet/byte_array_dict_decoder.h
b/be/src/format/parquet/byte_array_dict_decoder.h
index 28d82f04ef0..d5de83658a5 100644
--- a/be/src/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/format/parquet/byte_array_dict_decoder.h
@@ -54,7 +54,8 @@ public:
Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
- MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
+ Result<MutableColumnPtr> convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) override;
protected:
// For dictionary encoding
diff --git a/be/src/format/parquet/decoder.h b/be/src/format/parquet/decoder.h
index 9a2dd7cd7a2..46e4a1fea6d 100644
--- a/be/src/format/parquet/decoder.h
+++ b/be/src/format/parquet/decoder.h
@@ -80,7 +80,8 @@ public:
return Status::NotSupported("read_dict_values_to_column is not
supported");
}
- virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
+ virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Method convert_dict_column_to_string_column is
not supported");
}
diff --git a/be/src/format/parquet/fix_length_dict_decoder.hpp
b/be/src/format/parquet/fix_length_dict_decoder.hpp
index 721d9165af6..d2087223eea 100644
--- a/be/src/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/format/parquet/fix_length_dict_decoder.hpp
@@ -81,7 +81,7 @@ public:
ColumnSelectVector& select_vector, bool
is_dict_filter) {
size_t non_null_size = select_vector.num_values() -
select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
- assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
+ assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0 &&
!_dict_items.empty()) {
std::vector<StringRef> dict_items;
char* dict_item_address = (char*)_dict.get();
@@ -211,8 +211,17 @@ protected:
return Status::OK();
}
- MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override {
+ Result<MutableColumnPtr> convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) override {
auto res = ColumnString::create();
+ if (_dict_items.empty()) {
+ if (dict_column->size() > 0) {
+ LOG(ERROR) << "Attempt to convert dict column with empty
dictionary, column size: "
+ << dict_column->size();
+ return ResultError(Status::IOError("empty dictionary"));
+ }
+ return res;
+ }
std::vector<StringRef> dict_values;
dict_values.reserve(dict_column->size());
const auto& data = dict_column->get_data();
diff --git a/be/src/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/format/parquet/vparquet_column_chunk_reader.cpp
index a73c4485a6e..d4cfdfa01dc 100644
--- a/be/src/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/format/parquet/vparquet_column_chunk_reader.cpp
@@ -417,12 +417,21 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_decode_dict_page() {
if (!dict_loaded) {
// Load and decompress dictionary page from file
if (_block_compress_codec != nullptr) {
+ auto dict_num = header->dictionary_page_header.num_values;
+ if (dict_num == 0 && uncompressed_size != 0) {
+ return Status::IOError(
+ "Dictionary page's num_values is {} but
uncompressed_size is {}", dict_num,
+ uncompressed_size);
+ }
Slice compressed_data;
- RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
Slice dict_slice(dict_data.get(), uncompressed_size);
- RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&dict_slice));
+ if (dict_num != 0) {
+ RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&dict_slice));
+ }
// Decide whether to cache decompressed or compressed dictionary
based on threshold
+ // If uncompressed_page_size == 0, should_cache_decompressed will
return true
bool cache_payload_decompressed =
should_cache_decompressed(header, _metadata);
if (_page_read_ctx.enable_parquet_file_page_cache &&
@@ -431,10 +440,12 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_decode_dict_page() {
std::vector<uint8_t> empty_levels; // Dictionary pages don't
have levels
if (cache_payload_decompressed) {
// Cache the decompressed dictionary page
+ // If dict_num == 0, `dict_slice` will be empty
_insert_page_into_cache(empty_levels, dict_slice);
_chunk_statistics.page_cache_decompressed_write_counter +=
1;
} else {
if (config::enable_parquet_cache_compressed_pages) {
+ DCHECK(!compressed_data.empty());
// Cache the compressed dictionary page
_insert_page_into_cache(empty_levels,
Slice(compressed_data.data,
compressed_data.size));
@@ -442,6 +453,11 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_decode_dict_page() {
}
}
}
+ // `get_page_data` not called, we should skip the page data
+ // Because `_insert_page_into_cache` will use _page_reader, we
should exec `skip_page_data` after `_insert_page_into_cache`
+ if (dict_num == 0) {
+ _page_reader->skip_page_data();
+ }
} else {
Slice dict_slice;
RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
diff --git a/be/src/format/parquet/vparquet_column_chunk_reader.h
b/be/src/format/parquet/vparquet_column_chunk_reader.h
index 7b0f8c01756..85c71021d41 100644
--- a/be/src/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/format/parquet/vparquet_column_chunk_reader.h
@@ -178,7 +178,7 @@ public:
->read_dict_values_to_column(doris_column);
}
- MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) {
+ Result<MutableColumnPtr> convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
->convert_dict_column_to_string_column(dict_column);
}
diff --git a/be/src/format/parquet/vparquet_column_reader.cpp
b/be/src/format/parquet/vparquet_column_reader.cpp
index 6670c454302..9bdf3719b4a 100644
--- a/be/src/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/format/parquet/vparquet_column_reader.cpp
@@ -515,7 +515,7 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::read_dict_values_to_colu
return Status::OK();
}
template <bool IN_COLLECTION, bool OFFSET_INDEX>
-MutableColumnPtr
+Result<MutableColumnPtr>
ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::convert_dict_column_to_string_column(
const ColumnInt32* dict_column) {
return _chunk_reader->convert_dict_column_to_string_column(dict_column);
diff --git a/be/src/format/parquet/vparquet_column_reader.h
b/be/src/format/parquet/vparquet_column_reader.h
index 3e9b1adc38e..02a94731bab 100644
--- a/be/src/format/parquet/vparquet_column_reader.h
+++ b/be/src/format/parquet/vparquet_column_reader.h
@@ -156,7 +156,8 @@ public:
return Status::NotSupported("read_dict_values_to_column is not
supported");
}
- virtual MutableColumnPtr convert_dict_column_to_string_column(const
ColumnInt32* dict_column) {
+ virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) {
throw Exception(
Status::FatalError("Method
convert_dict_column_to_string_column is not supported"));
}
@@ -217,7 +218,8 @@ public:
FilterMap& filter_map, size_t batch_size, size_t*
read_rows, bool* eof,
bool is_dict_filter, int64_t real_column_size =
-1) override;
Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool*
has_dict) override;
- MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
+ Result<MutableColumnPtr> convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column) override;
const std::vector<level_t>& get_rep_level() const override { return
_rep_levels; }
const std::vector<level_t>& get_def_level() const override { return
_def_levels; }
ColumnStatistics column_statistics() override {
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp
b/be/src/format/parquet/vparquet_group_reader.cpp
index 804df762ae6..9e2c899502b 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -342,7 +342,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
#endif
if (block->rows() == 0) {
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
*read_rows = block->rows();
#ifndef NDEBUG
for (auto col : *block) {
@@ -382,7 +382,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
return Status::OK();
}
@@ -393,7 +393,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter)));
}
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
}
#ifndef NDEBUG
for (auto col : *block) {
@@ -598,7 +598,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
_cached_filtered_rows += pre_read_rows;
if (pre_raw_read_rows >= config::doris_scanner_row_num) {
*read_rows = 0;
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
return Status::OK();
}
} else { // pre_eof
@@ -606,7 +606,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
*read_rows = 0;
*batch_eof = true;
_lazy_read_filtered_rows += (pre_read_rows +
_cached_filtered_rows);
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
return Status::OK();
}
} else {
@@ -657,7 +657,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
}
}
- _convert_dict_cols_to_string_cols(block);
+ RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block));
size_t column_num = block->columns();
size_t column_size = 0;
@@ -1136,7 +1136,7 @@ Status
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
return Status::OK();
}
-void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -1151,9 +1151,9 @@ void
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
const auto* dict_column = assert_cast<const
ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
- MutableColumnPtr string_column =
+ auto string_column = DORIS_TRY(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column);
+ dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
@@ -1163,15 +1163,16 @@ void
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
nullable_column->get_null_map_column_ptr()));
} else {
const auto* dict_column = assert_cast<const
ColumnInt32*>(column.get());
- MutableColumnPtr string_column =
+ auto string_column = DORIS_TRY(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column);
+ dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeString>();
block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
std::move(string_column));
}
}
+ return Status::OK();
}
ParquetColumnReader::ColumnStatistics
RowGroupReader::merged_column_statistics() {
diff --git a/be/src/format/parquet/vparquet_group_reader.h
b/be/src/format/parquet/vparquet_group_reader.h
index 3c0bfe2ebff..7e76db5e4c2 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -232,7 +232,7 @@ private:
bool is_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata);
Status _rewrite_dict_predicates();
Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int
slot_id, bool is_nullable);
- void _convert_dict_cols_to_string_cols(Block* block);
+ Status _convert_dict_cols_to_string_cols(Block* block);
Status _get_current_batch_row_id(size_t read_rows);
Status _fill_row_id_columns(Block* block, size_t read_rows, bool
is_current_row_ids);
diff --git a/be/src/format/parquet/vparquet_page_reader.h
b/be/src/format/parquet/vparquet_page_reader.h
index cc3cc3e2e2f..2d4870256b5 100644
--- a/be/src/format/parquet/vparquet_page_reader.h
+++ b/be/src/format/parquet/vparquet_page_reader.h
@@ -65,6 +65,7 @@ inline bool should_cache_decompressed(const
tparquet::PageHeader* header,
const tparquet::ColumnMetaData&
metadata) {
if (header->compressed_page_size <= 0) return true;
if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return
true;
+ if (header->uncompressed_page_size == 0) return true;
double ratio = static_cast<double>(header->uncompressed_page_size) /
static_cast<double>(header->compressed_page_size);
diff --git
a/be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet
b/be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet
new file mode 100644
index 00000000000..7e1b31b73b0
Binary files /dev/null and
b/be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet differ
diff --git a/be/test/format/parquet/byte_array_dict_decoder_empty_dict_test.cpp
b/be/test/format/parquet/byte_array_dict_decoder_empty_dict_test.cpp
new file mode 100644
index 00000000000..c58eee2bf99
--- /dev/null
+++ b/be/test/format/parquet/byte_array_dict_decoder_empty_dict_test.cpp
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "core/column/column_vector.h"
+#include "format/parquet/byte_array_dict_decoder.h"
+
+namespace doris {
+
+class ByteArrayDictDecoderEmptyDictDataTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ auto dict_data = make_unique_buffer<uint8_t>(0);
+ ASSERT_TRUE(_decoder.set_dict(dict_data, 0, 0).ok());
+ }
+
+ ByteArrayDictDecoder _decoder;
+};
+
+TEST_F(ByteArrayDictDecoderEmptyDictDataTest,
+ test_convert_dict_column_to_string_column_with_empty_dict_data_error) {
+ MutableColumnPtr dict_column = ColumnInt32::create();
+ dict_column->insert(Field::create_field<TYPE_INT>(0));
+ dict_column->insert(Field::create_field<TYPE_INT>(1));
+ auto io_error =
TEST_RESULT_ERROR(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
+ ASSERT_TRUE(io_error.is<ErrorCode::IO_ERROR>());
+}
+
+TEST_F(ByteArrayDictDecoderEmptyDictDataTest,
+ test_convert_dict_column_to_string_column_with_empty_dict_data_success)
{
+ MutableColumnPtr dict_column = ColumnInt32::create();
+ auto string_column =
TEST_TRY(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
+ ASSERT_EQ(string_column->size(), 0);
+}
+
+} // namespace doris
diff --git a/be/test/format/parquet/byte_array_dict_decoder_test.cpp
b/be/test/format/parquet/byte_array_dict_decoder_test.cpp
index 99b8c83f2fb..3d7545f00f2 100644
--- a/be/test/format/parquet/byte_array_dict_decoder_test.cpp
+++ b/be/test/format/parquet/byte_array_dict_decoder_test.cpp
@@ -460,8 +460,8 @@ TEST_F(ByteArrayDictDecoderTest,
test_convert_dict_column_to_string_column) {
dict_column->insert(Field::create_field<TYPE_INT>(1));
// Convert to string column
- MutableColumnPtr string_column =
_decoder.convert_dict_column_to_string_column(
- assert_cast<ColumnInt32*>(dict_column.get()));
+ auto string_column =
TEST_TRY(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
// Verify results
ASSERT_EQ(string_column->size(), 4);
diff --git a/be/test/format/parquet/fix_length_dict_decoder_empty_dict_test.cpp
b/be/test/format/parquet/fix_length_dict_decoder_empty_dict_test.cpp
new file mode 100644
index 00000000000..e901507b0c3
--- /dev/null
+++ b/be/test/format/parquet/fix_length_dict_decoder_empty_dict_test.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "core/column/column_vector.h"
+#include "format/parquet/fix_length_dict_decoder.hpp"
+
+namespace doris {
+
+class FixLengthDictDecoderEmptyDictDataTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ _type_length = 6;
+ _decoder.set_type_length(_type_length);
+ auto dict_data = make_unique_buffer<uint8_t>(0);
+ ASSERT_TRUE(_decoder.set_dict(dict_data, 0, 0).ok());
+ }
+
+ FixLengthDictDecoder<tparquet::Type::FIXED_LEN_BYTE_ARRAY> _decoder;
+ size_t _type_length;
+};
+
+TEST_F(FixLengthDictDecoderEmptyDictDataTest,
+ test_convert_dict_column_to_string_column_with_empty_dict_data_error) {
+ MutableColumnPtr dict_column = ColumnInt32::create();
+ dict_column->insert(Field::create_field<TYPE_INT>(0));
+ dict_column->insert(Field::create_field<TYPE_INT>(1));
+
+ auto io_error =
TEST_RESULT_ERROR(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
+ ASSERT_TRUE(io_error.is<ErrorCode::IO_ERROR>());
+}
+
+TEST_F(FixLengthDictDecoderEmptyDictDataTest,
+ test_convert_dict_column_to_string_column_with_empty_dict_data_success)
{
+ MutableColumnPtr dict_column = ColumnInt32::create();
+
+ auto string_column =
TEST_TRY(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
+
+ ASSERT_EQ(string_column->size(), 0);
+}
+
+} // namespace doris
diff --git a/be/test/format/parquet/fix_length_dict_decoder_test.cpp
b/be/test/format/parquet/fix_length_dict_decoder_test.cpp
index a232ca37f1e..a8050663b43 100644
--- a/be/test/format/parquet/fix_length_dict_decoder_test.cpp
+++ b/be/test/format/parquet/fix_length_dict_decoder_test.cpp
@@ -488,8 +488,8 @@ TEST_F(FixLengthDictDecoderTest,
test_convert_dict_column_to_string_column) {
dict_column->insert(Field::create_field<TYPE_INT>(1));
// Convert to string column
- MutableColumnPtr string_column =
_decoder.convert_dict_column_to_string_column(
- assert_cast<ColumnInt32*>(dict_column.get()));
+ auto string_column =
TEST_TRY(_decoder.convert_dict_column_to_string_column(
+ assert_cast<ColumnInt32*>(dict_column.get())));
// Verify results
ASSERT_EQ(string_column->size(), 4);
diff --git a/be/test/format/parquet/parquet_reader_test.cpp
b/be/test/format/parquet/parquet_reader_test.cpp
index b0dd1256011..1d6e4632842 100644
--- a/be/test/format/parquet/parquet_reader_test.cpp
+++ b/be/test/format/parquet/parquet_reader_test.cpp
@@ -51,12 +51,210 @@
namespace doris {
class VExprContext;
-
+static VExprContextSPtrs create_predicates(DescriptorTbl* desc_tbl,
RuntimeState* runtime_state);
+template <bool filter_all>
+static VExprContextSPtrs create_partition_predicates(DescriptorTbl* desc_tbl,
+ RuntimeState*
runtime_state);
+static VExprContextSPtrs create_only_partition_predicates(DescriptorTbl*
desc_tbl,
+ RuntimeState*
runtime_state);
+static void create_table_desc(TDescriptorTable& t_desc_table,
TTableDescriptor& t_table_desc,
+ std::vector<std::string> table_column_names,
+ std::vector<TPrimitiveType::type> types);
class ParquetReaderTest : public testing::Test {
public:
ParquetReaderTest() : cache(1024) {}
FileMetaCache cache;
+
+ template <bool filter_all, bool enable_lazy>
+ void all_string_null_scan() {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+ std::vector<std::string> table_column_names = {"string_col",
"value_col"};
+ std::vector<TPrimitiveType::type> table_column_types =
{TPrimitiveType::STRING,
+
TPrimitiveType::INT};
+ create_table_desc(t_desc_table, t_table_desc, table_column_names,
table_column_types);
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ auto st = DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok()) << st;
+
+ auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+ auto local_fs = io::global_local_filesystem();
+ io::FileReaderSPtr reader;
+ st = local_fs->open_file(
+
"./be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet",
&reader);
+ EXPECT_TRUE(st.ok()) << st;
+
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
ctz);
+ auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ std::vector<std::string> column_names;
+ std::unordered_map<std::string, uint32_t> col_name_to_block_idx;
+ for (int i = 0; i < slot_descs.size(); i++) {
+ column_names.push_back(slot_descs[i]->col_name());
+ col_name_to_block_idx[slot_descs[i]->col_name()] = i;
+ }
+ TFileScanRangeParams scan_params;
+ TFileRangeDesc scan_range;
+ {
+ scan_range.start_offset = 0;
+ scan_range.size = 1000;
+ }
+ auto q_options = TQueryOptions();
+ q_options.__set_enable_adjust_conjunct_order_by_cost(true);
+ RuntimeState runtime_state = RuntimeState(q_options, TQueryGlobals());
+ auto p_reader =
+ std::make_unique<ParquetReader>(nullptr, scan_params,
scan_range, 992, &ctz,
+ nullptr, &runtime_state,
&cache, enable_lazy);
+ p_reader->set_file_reader(reader);
+ runtime_state.set_desc_tbl(desc_tbl);
+ phmap::flat_hash_map<int,
std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
+ auto conjuncts = create_predicates(desc_tbl, &runtime_state);
+ std::unordered_map<int, VExprContextSPtrs> slot_id_to_expr_ctxs;
+ slot_id_to_expr_ctxs[0].emplace_back(conjuncts[0]);
+ slot_id_to_expr_ctxs[1].emplace_back(conjuncts[1]);
+
+ if constexpr (filter_all) {
+ st = p_reader->init_reader(column_names, &col_name_to_block_idx,
conjuncts, tmp,
+ tuple_desc, nullptr, nullptr, nullptr,
+ &slot_id_to_expr_ctxs);
+ } else {
+ st = p_reader->init_reader(column_names, &col_name_to_block_idx,
{}, tmp, nullptr,
+ nullptr, nullptr, nullptr, nullptr);
+ }
+
+ EXPECT_TRUE(st.ok()) << st;
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ st = p_reader->set_fill_columns(partition_columns, missing_columns);
+ EXPECT_TRUE(st.ok()) << st;
+ bool eof = false;
+ size_t total_rows = 0;
+ bool all_null = true;
+ while (!eof) {
+ BlockUPtr block = Block::create_unique();
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ auto data_type = make_nullable(slot_desc->type());
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(ColumnWithTypeAndName(std::move(data_column),
data_type,
+ slot_desc->col_name()));
+ }
+
+ size_t read_row = 0;
+ st = p_reader->get_next_block(block.get(), &read_row, &eof);
+ EXPECT_TRUE(st.ok()) << st;
+ auto col = block->safe_get_by_position(0).column;
+ auto nullable_column = assert_cast<const
ColumnNullable*>(col.get());
+ const auto& null_map = nullable_column->get_null_map_data();
+ for (UInt8 is_null : null_map) {
+ all_null &= is_null != 0;
+ }
+ total_rows += col->size();
+ }
+ EXPECT_TRUE(all_null);
+ if constexpr (filter_all) {
+ EXPECT_EQ(total_rows, 0);
+ } else {
+ EXPECT_EQ(total_rows, 10000);
+ }
+ }
+
+ template <bool filter_all, bool enable_lazy>
+ void all_string_null_scan_with_predicate_partition_column() {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+ std::vector<std::string> table_column_names = {"string_col",
"value_col", "part_col"};
+ std::vector<TPrimitiveType::type> table_column_types = {
+ TPrimitiveType::STRING, TPrimitiveType::INT,
TPrimitiveType::INT};
+ create_table_desc(t_desc_table, t_table_desc, table_column_names,
table_column_types);
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ auto st = DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok()) << st;
+
+ auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+ auto local_fs = io::global_local_filesystem();
+ io::FileReaderSPtr reader;
+ st = local_fs->open_file(
+
"./be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet",
&reader);
+ EXPECT_TRUE(st.ok()) << st;
+
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
ctz);
+ auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ std::vector<std::string> column_names;
+ std::unordered_map<std::string, uint32_t> col_name_to_block_idx;
+ for (int i = 0; i < slot_descs.size(); i++) {
+ column_names.push_back(slot_descs[i]->col_name());
+ col_name_to_block_idx[slot_descs[i]->col_name()] = i;
+ }
+
+ TFileScanRangeParams scan_params;
+ TFileRangeDesc scan_range;
+ scan_range.start_offset = 0;
+ scan_range.size = 1000;
+ auto q_options = TQueryOptions();
+ q_options.__set_enable_adjust_conjunct_order_by_cost(true);
+ RuntimeState runtime_state = RuntimeState(q_options, TQueryGlobals());
+ auto p_reader =
+ std::make_unique<ParquetReader>(nullptr, scan_params,
scan_range, 992, &ctz,
+ nullptr, &runtime_state,
&cache, enable_lazy);
+ p_reader->set_file_reader(reader);
+ runtime_state.set_desc_tbl(desc_tbl);
+
+ phmap::flat_hash_map<int,
std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
+ auto conjuncts = create_partition_predicates<filter_all>(desc_tbl,
&runtime_state);
+ std::unordered_map<int, VExprContextSPtrs> slot_id_to_expr_ctxs;
+ slot_id_to_expr_ctxs[1].emplace_back(conjuncts[0]);
+ slot_id_to_expr_ctxs[2].emplace_back(conjuncts[1]);
+
+ st = p_reader->init_reader(column_names, &col_name_to_block_idx,
conjuncts, tmp, tuple_desc,
+ nullptr, nullptr, nullptr,
&slot_id_to_expr_ctxs);
+ EXPECT_TRUE(st.ok()) << st;
+
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ partition_columns.emplace("part_col", std::make_tuple("1",
tuple_desc->slots()[2]));
+ std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ st = p_reader->set_fill_columns(partition_columns, missing_columns);
+ EXPECT_TRUE(st.ok()) << st;
+
+ bool eof = false;
+ size_t total_rows = 0;
+ bool partition_all_one = true;
+ while (!eof) {
+ BlockUPtr block = Block::create_unique();
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ auto data_type = slot_desc->col_name() == "part_col"
+ ? slot_desc->type()
+ : make_nullable(slot_desc->type());
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(ColumnWithTypeAndName(std::move(data_column),
data_type,
+ slot_desc->col_name()));
+ }
+
+ size_t read_row = 0;
+ st = p_reader->get_next_block(block.get(), &read_row, &eof);
+ EXPECT_TRUE(st.ok()) << st;
+
+ auto partition_col = block->safe_get_by_position(2).column;
+ const auto* partition_col_ptr = assert_cast<const
ColumnInt32*>(partition_col.get());
+ const auto& partition_data = partition_col_ptr->get_data();
+ for (size_t i = 0; i < partition_data.size(); ++i) {
+ partition_all_one &= (partition_data[i] == 1);
+ }
+ total_rows += partition_col_ptr->size();
+ }
+
+ EXPECT_TRUE(partition_all_one);
+ if constexpr (filter_all) {
+ EXPECT_EQ(total_rows, 0);
+ } else {
+ EXPECT_EQ(total_rows, 10000);
+ }
+ }
};
static void create_table_desc(TDescriptorTable& t_desc_table,
TTableDescriptor& t_table_desc,
@@ -474,4 +672,346 @@ TEST_F(ParquetReaderTest, varbinary_string2) {
EXPECT_EQ(StringView(string_column->get_data_at(2)).dump_hex(), "0x00");
}
+static VExprContextSPtrs create_predicates(DescriptorTbl* desc_tbl,
RuntimeState* runtime_state) {
+ auto tuple_desc =
const_cast<doris::TupleDescriptor*>(desc_tbl->get_tuple_descriptor(0));
+ doris::RowDescriptor row_desc(tuple_desc);
+ VExprSPtr root;
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_STRING));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_STRING));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(true);
+ root = VectorizedFnCall::create_shared(texpr_node);
+ }
+ { root->add_child(VSlotRef::create_shared(tuple_desc->slots()[0])); }
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::STRING_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_STRING));
+ TStringLiteral string_literal;
+ string_literal.__set_value("test");
+ texpr_node.__set_string_literal(string_literal);
+ texpr_node.__set_is_nullable(false);
+ root->add_child(VLiteral::create_shared(texpr_node));
+ }
+ VExprContextSPtr ctx = VExprContext::create_shared(root);
+
+ auto st = ctx->prepare(runtime_state, row_desc);
+ EXPECT_TRUE(st.ok()) << st;
+ auto st1 = ctx->open(runtime_state);
+ EXPECT_TRUE(st1.ok()) << st1;
+
+ VExprContextSPtrs res = {ctx};
+
+ VExprSPtr root2;
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(true);
+ root2 = VectorizedFnCall::create_shared(texpr_node);
+ }
+ { root2->add_child(VSlotRef::create_shared(tuple_desc->slots()[1])); }
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ int_literal.__set_value(1);
+ texpr_node.__set_int_literal(int_literal);
+ texpr_node.__set_is_nullable(false);
+ root2->add_child(VLiteral::create_shared(texpr_node));
+ }
+ VExprContextSPtr ctx2 = VExprContext::create_shared(root2);
+
+ auto st2 = ctx2->prepare(runtime_state, row_desc);
+ EXPECT_TRUE(st2.ok()) << st2;
+ auto st3 = ctx2->open(runtime_state);
+ EXPECT_TRUE(st3.ok()) << st3;
+
+ res.push_back(ctx2);
+
+ return res;
+}
+
+template <bool filter_all>
+static VExprContextSPtrs create_partition_predicates(DescriptorTbl* desc_tbl,
+ RuntimeState*
runtime_state) {
+ auto tuple_desc =
const_cast<doris::TupleDescriptor*>(desc_tbl->get_tuple_descriptor(0));
+ doris::RowDescriptor row_desc(tuple_desc);
+ VExprContextSPtrs res;
+
+ VExprSPtr value_eq_root;
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(true);
+ value_eq_root = VectorizedFnCall::create_shared(texpr_node);
+ }
+ value_eq_root->add_child(VSlotRef::create_shared(tuple_desc->slots()[1]));
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ if constexpr (filter_all) {
+ int_literal.__set_value(0);
+ } else {
+ int_literal.__set_value(1);
+ }
+ texpr_node.__set_int_literal(int_literal);
+ texpr_node.__set_is_nullable(false);
+ value_eq_root->add_child(VLiteral::create_shared(texpr_node));
+ }
+ VExprContextSPtr value_eq_ctx = VExprContext::create_shared(value_eq_root);
+ auto st = value_eq_ctx->prepare(runtime_state, row_desc);
+ EXPECT_TRUE(st.ok()) << st;
+ st = value_eq_ctx->open(runtime_state);
+ EXPECT_TRUE(st.ok()) << st;
+ res.push_back(value_eq_ctx);
+
+ VExprSPtr partition_eq_root;
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(true);
+ partition_eq_root = VectorizedFnCall::create_shared(texpr_node);
+ }
+
partition_eq_root->add_child(VSlotRef::create_shared(tuple_desc->slots()[2]));
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ int_literal.__set_value(1);
+ texpr_node.__set_int_literal(int_literal);
+ texpr_node.__set_is_nullable(false);
+ partition_eq_root->add_child(VLiteral::create_shared(texpr_node));
+ }
+ VExprContextSPtr partition_eq_ctx =
VExprContext::create_shared(partition_eq_root);
+ st = partition_eq_ctx->prepare(runtime_state, row_desc);
+ EXPECT_TRUE(st.ok()) << st;
+ st = partition_eq_ctx->open(runtime_state);
+ EXPECT_TRUE(st.ok()) << st;
+ res.push_back(partition_eq_ctx);
+
+ return res;
+}
+
+static VExprContextSPtrs create_only_partition_predicates(DescriptorTbl*
desc_tbl,
+ RuntimeState*
runtime_state) {
+ auto tuple_desc =
const_cast<doris::TupleDescriptor*>(desc_tbl->get_tuple_descriptor(0));
+ doris::RowDescriptor row_desc(tuple_desc);
+ VExprContextSPtrs res;
+
+ VExprSPtr partition_eq_root;
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(true);
+ partition_eq_root = VectorizedFnCall::create_shared(texpr_node);
+ }
+
partition_eq_root->add_child(VSlotRef::create_shared(tuple_desc->slots()[0]));
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ int_literal.__set_value(1);
+ texpr_node.__set_int_literal(int_literal);
+ texpr_node.__set_is_nullable(false);
+ partition_eq_root->add_child(VLiteral::create_shared(texpr_node));
+ }
+ VExprContextSPtr partition_eq_ctx =
VExprContext::create_shared(partition_eq_root);
+ auto st = partition_eq_ctx->prepare(runtime_state, row_desc);
+ EXPECT_TRUE(st.ok()) << st;
+ st = partition_eq_ctx->open(runtime_state);
+ EXPECT_TRUE(st.ok()) << st;
+ res.push_back(partition_eq_ctx);
+
+ return res;
+}
+
+TEST_F(ParquetReaderTest, all_string_null) {
+ all_string_null_scan<true, true>();
+ all_string_null_scan<true, false>();
+ all_string_null_scan<false, true>();
+ all_string_null_scan<false, false>();
+}
+
+TEST_F(ParquetReaderTest, all_string_null_with_predicate_partition_column) {
+ all_string_null_scan_with_predicate_partition_column<true, true>();
+ all_string_null_scan_with_predicate_partition_column<true, false>();
+ all_string_null_scan_with_predicate_partition_column<false, true>();
+ all_string_null_scan_with_predicate_partition_column<false, false>();
+}
+
+TEST_F(ParquetReaderTest, only_partition_column) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+ std::vector<std::string> table_column_names = {"part_col"};
+ std::vector<TPrimitiveType::type> table_column_types =
{TPrimitiveType::INT};
+ create_table_desc(t_desc_table, t_table_desc, table_column_names,
table_column_types);
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ auto st = DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ EXPECT_TRUE(st.ok()) << st;
+
+ auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+ auto local_fs = io::global_local_filesystem();
+ io::FileReaderSPtr reader;
+ st = local_fs->open_file(
+
"./be/test/exec/test_data/parquet_scanner/test_string_null.zst.parquet",
&reader);
+ EXPECT_TRUE(st.ok()) << st;
+
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
+ auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ std::vector<std::string> column_names;
+ std::unordered_map<std::string, uint32_t> col_name_to_block_idx;
+ for (int i = 0; i < slot_descs.size(); i++) {
+ column_names.push_back(slot_descs[i]->col_name());
+ col_name_to_block_idx[slot_descs[i]->col_name()] = i;
+ }
+
+ TFileScanRangeParams scan_params;
+ TFileRangeDesc scan_range;
+ scan_range.start_offset = 0;
+ scan_range.size = 1000;
+ auto q_options = TQueryOptions();
+ q_options.__set_enable_adjust_conjunct_order_by_cost(true);
+ RuntimeState runtime_state = RuntimeState(q_options, TQueryGlobals());
+ auto p_reader = std::make_unique<ParquetReader>(nullptr, scan_params,
scan_range, 992, &ctz,
+ nullptr, &runtime_state,
&cache);
+ p_reader->set_file_reader(reader);
+ runtime_state.set_desc_tbl(desc_tbl);
+
+ phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>
tmp;
+ auto conjuncts = create_only_partition_predicates(desc_tbl,
&runtime_state);
+ std::unordered_map<int, VExprContextSPtrs> slot_id_to_expr_ctxs;
+ slot_id_to_expr_ctxs[0].emplace_back(conjuncts[0]);
+
+ st = p_reader->init_reader(column_names, &col_name_to_block_idx,
conjuncts, tmp, tuple_desc,
+ nullptr, nullptr, nullptr,
&slot_id_to_expr_ctxs);
+ EXPECT_TRUE(st.ok()) << st;
+
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ partition_columns.emplace("part_col", std::make_tuple("1",
tuple_desc->slots()[0]));
+ std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ st = p_reader->set_fill_columns(partition_columns, missing_columns);
+ EXPECT_TRUE(st.ok()) << st;
+
+ bool eof = false;
+ size_t total_rows = 0;
+ bool partition_all_one = true;
+ while (!eof) {
+ BlockUPtr block = Block::create_unique();
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ auto data_type = slot_desc->type();
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(ColumnWithTypeAndName(std::move(data_column),
data_type,
+ slot_desc->col_name()));
+ }
+
+ size_t read_row = 0;
+ st = p_reader->get_next_block(block.get(), &read_row, &eof);
+ EXPECT_TRUE(st.ok()) << st;
+
+ auto partition_col = block->safe_get_by_position(0).column;
+ const auto* partition_col_ptr = assert_cast<const
ColumnInt32*>(partition_col.get());
+ const auto& partition_data = partition_col_ptr->get_data();
+ for (size_t i = 0; i < partition_data.size(); ++i) {
+ partition_all_one &= (partition_data[i] == 1);
+ }
+ total_rows += partition_col_ptr->size();
+ }
+
+ EXPECT_TRUE(partition_all_one);
+ EXPECT_EQ(total_rows, 10000);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]