This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new bcd0df19 Fix null field in cpp. (#668)
bcd0df19 is described below
commit bcd0df1958d1e66659adc7b0ec0ac1f2a9fc0824
Author: Colin Lee <[email protected]>
AuthorDate: Fri Dec 19 11:36:42 2025 +0800
Fix null field in cpp. (#668)
---
.github/workflows/unit-test-cpp.yml | 1 +
.github/workflows/unit-test-python.yml | 1 +
cpp/src/common/tsblock/tsblock.h | 12 --
cpp/src/encoding/dictionary_decoder.h | 24 +--
cpp/src/encoding/gorilla_decoder.h | 162 +++++++--------
cpp/src/encoding/int32_rle_decoder.h | 36 ++--
cpp/src/encoding/plain_decoder.h | 27 ++-
cpp/src/encoding/sprintz_encoder.h | 2 +-
cpp/src/reader/aligned_chunk_reader.cc | 45 +++-
cpp/src/reader/aligned_chunk_reader.h | 99 ++++-----
cpp/src/reader/result_set.h | 5 +-
cpp/src/utils/errno_define.h | 4 +-
cpp/src/utils/util_define.h | 7 +
.../reader/table_view/tsfile_reader_table_test.cc | 230 +++++++++++++++++++++
14 files changed, 459 insertions(+), 196 deletions(-)
diff --git a/.github/workflows/unit-test-cpp.yml
b/.github/workflows/unit-test-cpp.yml
index d4172f8b..15a62991 100644
--- a/.github/workflows/unit-test-cpp.yml
+++ b/.github/workflows/unit-test-cpp.yml
@@ -111,6 +111,7 @@ jobs:
if [[ "$RUNNER_OS" == "Linux" ]]; then
sudo update-alternatives --install /usr/bin/clang-format
clang-format /usr/bin/clang-format-17 100
sudo update-alternatives --set clang-format
/usr/bin/clang-format-17
+ sudo apt-get update
sudo apt-get install -y uuid-dev
elif [[ "$RUNNER_OS" == "Windows" ]]; then
choco install llvm --version 17.0.6 --force
diff --git a/.github/workflows/unit-test-python.yml
b/.github/workflows/unit-test-python.yml
index 72be3236..f1f799b7 100644
--- a/.github/workflows/unit-test-python.yml
+++ b/.github/workflows/unit-test-python.yml
@@ -82,6 +82,7 @@ jobs:
if [[ "$RUNNER_OS" == "Linux" ]]; then
sudo update-alternatives --install /usr/bin/clang-format
clang-format /usr/bin/clang-format-17 100
sudo update-alternatives --set clang-format
/usr/bin/clang-format-17
+ sudo apt-get update
sudo apt-get install -y uuid-dev
elif [[ "$RUNNER_OS" == "Windows" ]]; then
choco install llvm --version 17.0.6 --force
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index dce94f8a..859ad393 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -76,18 +76,6 @@ class TsBlock {
capacity_ += extend_size;
}
- // need to call flush_row_count after using colappender
- FORCE_INLINE int flush_row_count(uint32_t row_count) {
- int errnum = E_OK;
- if (row_count_ == 0) {
- row_count_ = row_count;
- } else if (row_count_ != row_count) {
- LOGE("Inconsistent number of rows in two columns");
- errnum = E_TSBLOCK_DATA_INCONSISTENCY;
- }
- return errnum;
- }
-
FORCE_INLINE void fill_trailling_nulls() {
for (uint32_t i = 0; i < get_column_count(); ++i) {
for (uint32_t j = vectors_[i]->get_row_num(); j < row_count_; ++j)
{
diff --git a/cpp/src/encoding/dictionary_decoder.h
b/cpp/src/encoding/dictionary_decoder.h
index 0dce5a2b..5f64b587 100644
--- a/cpp/src/encoding/dictionary_decoder.h
+++ b/cpp/src/encoding/dictionary_decoder.h
@@ -37,39 +37,39 @@ class DictionaryDecoder : public Decoder {
public:
~DictionaryDecoder() override = default;
- bool has_remaining(const common::ByteStream &buffer) {
+ bool has_remaining(const common::ByteStream& buffer) override {
return (!entry_index_.empty() && value_decoder_.has_next_package()) ||
buffer.has_remaining();
}
- int read_boolean(bool &ret_value, common::ByteStream &in) override {
+ int read_boolean(bool& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_int32(int32_t &ret_value, common::ByteStream &in) override {
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_int64(int64_t &ret_value, common::ByteStream &in) override {
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_float(float &ret_value, common::ByteStream &in) override {
+ int read_float(float& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_double(double &ret_value, common::ByteStream &in) override {
+ int read_double(double& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_String(common::String &ret_value, common::PageArena &pa,
- common::ByteStream &in) {
+ int read_String(common::String& ret_value, common::PageArena& pa,
+ common::ByteStream& in) override {
auto std_str = read_string(in);
return ret_value.dup_from(std_str, pa);
}
void init() { value_decoder_.init(); }
- void reset() {
+ void reset() override {
value_decoder_.reset();
entry_index_.clear();
}
- std::string read_string(common::ByteStream &buffer) {
+ std::string read_string(common::ByteStream& buffer) {
if (entry_index_.empty()) {
init_map(buffer);
}
@@ -77,14 +77,14 @@ class DictionaryDecoder : public Decoder {
return entry_index_[code];
}
- bool has_next(common::ByteStream &buffer) {
+ bool has_next(common::ByteStream& buffer) {
if (entry_index_.empty()) {
init_map(buffer);
}
return value_decoder_.has_next(buffer);
}
- int init_map(common::ByteStream &buffer) {
+ int init_map(common::ByteStream& buffer) {
int ret = common::E_OK;
int length = 0;
if (RET_FAIL(common::SerializationUtil::read_var_int(length, buffer)))
{
diff --git a/cpp/src/encoding/gorilla_decoder.h
b/cpp/src/encoding/gorilla_decoder.h
index e2b62063..4429f2b5 100644
--- a/cpp/src/encoding/gorilla_decoder.h
+++ b/cpp/src/encoding/gorilla_decoder.h
@@ -49,12 +49,12 @@ class GorillaDecoder : public Decoder {
}
FORCE_INLINE bool has_next() { return has_next_; }
- FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) {
+ FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override
{
return buffer.has_remaining() || has_next();
}
// If empty, cache 8 bits from in_stream to 'buffer_'.
- void flush_byte_if_empty(common::ByteStream &in) {
+ void flush_byte_if_empty(common::ByteStream& in) {
if (bits_left_ == 0) {
uint32_t read_len = 0;
in.read_buf(&buffer_, 1, read_len);
@@ -63,7 +63,7 @@ class GorillaDecoder : public Decoder {
}
// Reads the next bit and returns true if the next bit is 1, otherwise 0.
- bool read_bit(common::ByteStream &in) {
+ bool read_bit(common::ByteStream& in) {
bool bit = ((buffer_ >> (bits_left_ - 1)) & 1) == 1;
bits_left_--;
flush_byte_if_empty(in);
@@ -76,7 +76,7 @@ class GorillaDecoder : public Decoder {
* @bits: How many next bits are reader from the stream
* return: long value that was reader from the stream
*/
- int64_t read_long(int bits, common::ByteStream &in) {
+ int64_t read_long(int bits, common::ByteStream& in) {
int64_t value = 0;
while (bits > 0) {
if (bits > bits_left_ || bits == 8) {
@@ -101,7 +101,7 @@ class GorillaDecoder : public Decoder {
}
// Read the control bits
- uint8_t read_next_control_bit(int max_bits, common::ByteStream &in) {
+ uint8_t read_next_control_bit(int max_bits, common::ByteStream& in) {
uint8_t value = 0x00;
for (int i = 0; i < max_bits; i++) {
value <<= 1;
@@ -114,18 +114,18 @@ class GorillaDecoder : public Decoder {
return value;
}
- T read_next(common::ByteStream &in);
- virtual T cache_next(common::ByteStream &in);
- T decode(common::ByteStream &in);
+ T read_next(common::ByteStream& in);
+ virtual T cache_next(common::ByteStream& in);
+ T decode(common::ByteStream& in);
// interface from Decoder
- int read_boolean(bool &ret_value, common::ByteStream &in);
- int read_int32(int32_t &ret_value, common::ByteStream &in);
- int read_int64(int64_t &ret_value, common::ByteStream &in);
- int read_float(float &ret_value, common::ByteStream &in);
- int read_double(double &ret_value, common::ByteStream &in);
- int read_String(common::String &ret_value, common::PageArena &pa,
- common::ByteStream &in);
+ int read_boolean(bool& ret_value, common::ByteStream& in) override;
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override;
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override;
+ int read_float(float& ret_value, common::ByteStream& in) override;
+ int read_double(double& ret_value, common::ByteStream& in) override;
+ int read_String(common::String& ret_value, common::PageArena& pa,
+ common::ByteStream& in) override;
public:
common::TSEncoding type_;
@@ -140,7 +140,7 @@ class GorillaDecoder : public Decoder {
template <>
FORCE_INLINE int32_t
-GorillaDecoder<int32_t>::read_next(common::ByteStream &in) {
+GorillaDecoder<int32_t>::read_next(common::ByteStream& in) {
uint8_t control_bits = read_next_control_bit(2, in);
uint8_t significant_bits = 0;
int32_t xor_value = 0;
@@ -172,7 +172,7 @@ GorillaDecoder<int32_t>::read_next(common::ByteStream &in) {
template <>
FORCE_INLINE int64_t
-GorillaDecoder<int64_t>::read_next(common::ByteStream &in) {
+GorillaDecoder<int64_t>::read_next(common::ByteStream& in) {
uint8_t control_bits = read_next_control_bit(2, in);
uint8_t significant_bits = 0;
@@ -208,7 +208,7 @@ GorillaDecoder<int64_t>::read_next(common::ByteStream &in) {
template <>
FORCE_INLINE int32_t
-GorillaDecoder<int32_t>::cache_next(common::ByteStream &in) {
+GorillaDecoder<int32_t>::cache_next(common::ByteStream& in) {
read_next(in);
if (stored_value_ == GORILLA_ENCODING_ENDING_INTEGER) {
has_next_ = false;
@@ -218,7 +218,7 @@ GorillaDecoder<int32_t>::cache_next(common::ByteStream &in)
{
template <>
FORCE_INLINE int64_t
-GorillaDecoder<int64_t>::cache_next(common::ByteStream &in) {
+GorillaDecoder<int64_t>::cache_next(common::ByteStream& in) {
read_next(in);
if (stored_value_ == GORILLA_ENCODING_ENDING_LONG) {
has_next_ = false;
@@ -227,7 +227,7 @@ GorillaDecoder<int64_t>::cache_next(common::ByteStream &in)
{
}
template <>
-FORCE_INLINE int32_t GorillaDecoder<int32_t>::decode(common::ByteStream &in) {
+FORCE_INLINE int32_t GorillaDecoder<int32_t>::decode(common::ByteStream& in) {
int32_t ret_value = stored_value_;
if (UNLIKELY(!first_value_was_read_)) {
flush_byte_if_empty(in);
@@ -240,7 +240,7 @@ FORCE_INLINE int32_t
GorillaDecoder<int32_t>::decode(common::ByteStream &in) {
}
template <>
-FORCE_INLINE int64_t GorillaDecoder<int64_t>::decode(common::ByteStream &in) {
+FORCE_INLINE int64_t GorillaDecoder<int64_t>::decode(common::ByteStream& in) {
int64_t ret_value = stored_value_;
if (UNLIKELY(!first_value_was_read_)) {
flush_byte_if_empty(in);
@@ -254,18 +254,18 @@ FORCE_INLINE int64_t
GorillaDecoder<int64_t>::decode(common::ByteStream &in) {
class FloatGorillaDecoder : public GorillaDecoder<int32_t> {
public:
- int read_boolean(bool &ret_value, common::ByteStream &in);
- int read_int32(int32_t &ret_value, common::ByteStream &in);
- int read_int64(int64_t &ret_value, common::ByteStream &in);
- int read_float(float &ret_value, common::ByteStream &in);
- int read_double(double &ret_value, common::ByteStream &in);
+ int read_boolean(bool& ret_value, common::ByteStream& in);
+ int read_int32(int32_t& ret_value, common::ByteStream& in);
+ int read_int64(int64_t& ret_value, common::ByteStream& in);
+ int read_float(float& ret_value, common::ByteStream& in);
+ int read_double(double& ret_value, common::ByteStream& in);
- float decode(common::ByteStream &in) {
+ float decode(common::ByteStream& in) {
int32_t value_int = GorillaDecoder<int32_t>::decode(in);
return common::int_to_float(value_int);
}
- int32_t cache_next(common::ByteStream &in) {
+ int32_t cache_next(common::ByteStream& in) {
read_next(in);
if (stored_value_ ==
common::float_to_int(GORILLA_ENCODING_ENDING_FLOAT)) {
@@ -277,18 +277,18 @@ class FloatGorillaDecoder : public
GorillaDecoder<int32_t> {
class DoubleGorillaDecoder : public GorillaDecoder<int64_t> {
public:
- int read_boolean(bool &ret_value, common::ByteStream &in);
- int read_int32(int32_t &ret_value, common::ByteStream &in);
- int read_int64(int64_t &ret_value, common::ByteStream &in);
- int read_float(float &ret_value, common::ByteStream &in);
- int read_double(double &ret_value, common::ByteStream &in);
+ int read_boolean(bool& ret_value, common::ByteStream& in);
+ int read_int32(int32_t& ret_value, common::ByteStream& in);
+ int read_int64(int64_t& ret_value, common::ByteStream& in);
+ int read_float(float& ret_value, common::ByteStream& in);
+ int read_double(double& ret_value, common::ByteStream& in);
- double decode(common::ByteStream &in) {
+ double decode(common::ByteStream& in) {
int64_t value_long = GorillaDecoder<int64_t>::decode(in);
return common::long_to_double(value_long);
}
- int64_t cache_next(common::ByteStream &in) {
+ int64_t cache_next(common::ByteStream& in) {
read_next(in);
if (stored_value_ ==
common::double_to_long(GORILLA_ENCODING_ENDING_DOUBLE)) {
@@ -303,126 +303,126 @@ typedef GorillaDecoder<int64_t> LongGorillaDecoder;
// wrap as Decoder interface
template <>
-FORCE_INLINE int IntGorillaDecoder::read_boolean(bool &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_boolean(bool& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int IntGorillaDecoder::read_int32(int32_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_int32(int32_t& ret_value,
+ common::ByteStream& in) {
ret_value = decode(in);
return common::E_OK;
}
template <>
-FORCE_INLINE int IntGorillaDecoder::read_int64(int64_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_int64(int64_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int IntGorillaDecoder::read_float(float &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_float(float& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int IntGorillaDecoder::read_double(double &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_double(double& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int IntGorillaDecoder::read_String(common::String &ret_value,
- common::PageArena &pa,
- common::ByteStream &in) {
+FORCE_INLINE int IntGorillaDecoder::read_String(common::String& ret_value,
+ common::PageArena& pa,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_boolean(bool &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_boolean(bool& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_int32(int32_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_int32(int32_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_int64(int64_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_int64(int64_t& ret_value,
+ common::ByteStream& in) {
ret_value = decode(in);
return common::E_OK;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_float(float &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_float(float& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_double(double &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_double(double& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
template <>
-FORCE_INLINE int LongGorillaDecoder::read_String(common::String &ret_value,
- common::PageArena &pa,
- common::ByteStream &in) {
+FORCE_INLINE int LongGorillaDecoder::read_String(common::String& ret_value,
+ common::PageArena& pa,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int FloatGorillaDecoder::read_boolean(bool &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int FloatGorillaDecoder::read_boolean(bool& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int FloatGorillaDecoder::read_int32(int32_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int FloatGorillaDecoder::read_int32(int32_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int FloatGorillaDecoder::read_int64(int64_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int FloatGorillaDecoder::read_int64(int64_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int FloatGorillaDecoder::read_float(float &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int FloatGorillaDecoder::read_float(float& ret_value,
+ common::ByteStream& in) {
ret_value = decode(in);
return common::E_OK;
}
-FORCE_INLINE int FloatGorillaDecoder::read_double(double &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int FloatGorillaDecoder::read_double(double& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int DoubleGorillaDecoder::read_boolean(bool &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int DoubleGorillaDecoder::read_boolean(bool& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int DoubleGorillaDecoder::read_int32(int32_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int DoubleGorillaDecoder::read_int32(int32_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int DoubleGorillaDecoder::read_int64(int64_t &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int DoubleGorillaDecoder::read_int64(int64_t& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int DoubleGorillaDecoder::read_float(float &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int DoubleGorillaDecoder::read_float(float& ret_value,
+ common::ByteStream& in) {
ASSERT(false);
return common::E_NOT_SUPPORT;
}
-FORCE_INLINE int DoubleGorillaDecoder::read_double(double &ret_value,
- common::ByteStream &in) {
+FORCE_INLINE int DoubleGorillaDecoder::read_double(double& ret_value,
+ common::ByteStream& in) {
ret_value = decode(in);
return common::E_OK;
}
diff --git a/cpp/src/encoding/int32_rle_decoder.h
b/cpp/src/encoding/int32_rle_decoder.h
index 647f095a..757a9259 100644
--- a/cpp/src/encoding/int32_rle_decoder.h
+++ b/cpp/src/encoding/int32_rle_decoder.h
@@ -38,9 +38,9 @@ class Int32RleDecoder : public Decoder {
bool is_length_and_bitwidth_readed_;
int current_count_;
common::ByteStream byte_cache_;
- int32_t *current_buffer_;
- Int32Packer *packer_;
- uint8_t *tmp_buf_;
+ int32_t* current_buffer_;
+ Int32Packer* packer_;
+ uint8_t* tmp_buf_;
public:
Int32RleDecoder()
@@ -55,30 +55,30 @@ class Int32RleDecoder : public Decoder {
tmp_buf_(nullptr) {}
~Int32RleDecoder() override { destroy(); }
- bool has_remaining(const common::ByteStream &buffer) override {
+ bool has_remaining(const common::ByteStream& buffer) override {
return buffer.has_remaining() || has_next_package();
}
- int read_boolean(bool &ret_value, common::ByteStream &in) {
+ int read_boolean(bool& ret_value, common::ByteStream& in) override {
int32_t bool_value;
read_int32(bool_value, in);
ret_value = bool_value == 0 ? false : true;
return common::E_OK;
}
- int read_int32(int32_t &ret_value, common::ByteStream &in) override {
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
ret_value = static_cast<int32_t>(read_int(in));
return common::E_OK;
}
- int read_int64(int64_t &ret_value, common::ByteStream &in) override {
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_float(float &ret_value, common::ByteStream &in) override {
+ int read_float(float& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_double(double &ret_value, common::ByteStream &in) override {
+ int read_double(double& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_String(common::String &ret_value, common::PageArena &pa,
- common::ByteStream &in) override {
+ int read_String(common::String& ret_value, common::PageArena& pa,
+ common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
@@ -91,7 +91,7 @@ class Int32RleDecoder : public Decoder {
current_count_ = 0;
}
- bool has_next(common::ByteStream &buffer) {
+ bool has_next(common::ByteStream& buffer) {
if (current_count_ > 0 || buffer.remaining_size() > 0 ||
has_next_package()) {
return true;
@@ -103,7 +103,7 @@ class Int32RleDecoder : public Decoder {
return current_count_ > 0 || byte_cache_.remaining_size() > 0;
}
- int32_t read_int(common::ByteStream &buffer) {
+ int32_t read_int(common::ByteStream& buffer) {
if (!is_length_and_bitwidth_readed_) {
// start to reader a new rle+bit-packing pattern
read_length_and_bitwidth(buffer);
@@ -153,7 +153,7 @@ class Int32RleDecoder : public Decoder {
if (current_buffer_ != nullptr) {
common::mem_free(current_buffer_);
}
- current_buffer_ = static_cast<int32_t *>(
+ current_buffer_ = static_cast<int32_t*>(
common::mem_alloc(sizeof(int32_t) * bit_packed_group_count * 8,
common::MOD_DECODER_OBJ));
if (IS_NULL(current_buffer_)) {
@@ -179,7 +179,7 @@ class Int32RleDecoder : public Decoder {
return ret;
}
- int read_length_and_bitwidth(common::ByteStream &buffer) {
+ int read_length_and_bitwidth(common::ByteStream& buffer) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(length_, buffer))) {
@@ -189,18 +189,18 @@ class Int32RleDecoder : public Decoder {
common::mem_free(tmp_buf_);
}
tmp_buf_ =
- (uint8_t *)common::mem_alloc(length_, common::MOD_DECODER_OBJ);
+ (uint8_t*)common::mem_alloc(length_, common::MOD_DECODER_OBJ);
if (tmp_buf_ == nullptr) {
return common::E_OOM;
}
uint32_t ret_read_len = 0;
- if (RET_FAIL(buffer.read_buf((uint8_t *)tmp_buf_, length_,
+ if (RET_FAIL(buffer.read_buf((uint8_t*)tmp_buf_, length_,
ret_read_len))) {
return ret;
} else if (length_ != ret_read_len) {
ret = common::E_PARTIAL_READ;
}
- byte_cache_.wrap_from((char *)tmp_buf_, length_);
+ byte_cache_.wrap_from((char*)tmp_buf_, length_);
is_length_and_bitwidth_readed_ = true;
uint8_t tmp_bit_width;
common::SerializationUtil::read_ui8(tmp_bit_width, byte_cache_);
diff --git a/cpp/src/encoding/plain_decoder.h b/cpp/src/encoding/plain_decoder.h
index d1c6969e..c2627f71 100644
--- a/cpp/src/encoding/plain_decoder.h
+++ b/cpp/src/encoding/plain_decoder.h
@@ -27,34 +27,39 @@ namespace storage {
class PlainDecoder : public Decoder {
public:
~PlainDecoder() override = default;
- FORCE_INLINE void reset() { /* do nothing */
+ FORCE_INLINE void reset() override { /* do nothing */
}
- FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) {
+ FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override
{
return buffer.has_remaining();
}
- FORCE_INLINE int read_boolean(bool &ret_bool, common::ByteStream &in) {
- return common::SerializationUtil::read_ui8((uint8_t &)ret_bool, in);
+ FORCE_INLINE int read_boolean(bool& ret_bool,
+ common::ByteStream& in) override {
+ return common::SerializationUtil::read_ui8((uint8_t&)ret_bool, in);
}
- FORCE_INLINE int read_int32(int32_t &ret_int32, common::ByteStream &in) {
+ FORCE_INLINE int read_int32(int32_t& ret_int32,
+ common::ByteStream& in) override {
return common::SerializationUtil::read_var_int(ret_int32, in);
}
- FORCE_INLINE int read_int64(int64_t &ret_int64, common::ByteStream &in) {
+ FORCE_INLINE int read_int64(int64_t& ret_int64,
+ common::ByteStream& in) override {
return common::SerializationUtil::read_i64(ret_int64, in);
}
- FORCE_INLINE int read_float(float &ret_float, common::ByteStream &in) {
+ FORCE_INLINE int read_float(float& ret_float,
+ common::ByteStream& in) override {
return common::SerializationUtil::read_float(ret_float, in);
}
- FORCE_INLINE int read_double(double &ret_double, common::ByteStream &in) {
+ FORCE_INLINE int read_double(double& ret_double,
+ common::ByteStream& in) override {
return common::SerializationUtil::read_double(ret_double, in);
}
- FORCE_INLINE int read_String(common::String &ret_String,
- common::PageArena &pa,
- common::ByteStream &in) {
+ FORCE_INLINE int read_String(common::String& ret_String,
+ common::PageArena& pa,
+ common::ByteStream& in) override {
return common::SerializationUtil::read_mystring(ret_String, &pa, in);
}
};
diff --git a/cpp/src/encoding/sprintz_encoder.h
b/cpp/src/encoding/sprintz_encoder.h
index 30934480..04906f12 100644
--- a/cpp/src/encoding/sprintz_encoder.h
+++ b/cpp/src/encoding/sprintz_encoder.h
@@ -34,7 +34,7 @@ class SprintzEncoder : public Encoder {
predict_method_ = method;
}
- virtual void reset() {
+ virtual void reset() override {
byte_cache_.reset();
is_first_cached_ = false;
group_num_ = 0;
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index b39db564..14250e7f 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -253,6 +253,12 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta*&
chunk_meta,
int cur_page_header_serialized_size = 0;
// TODOļ¼ configurable
int retry_read_want_size = 1024;
+ if (chunk_visit_offset - chunk_header.serialized_size_ >=
+ chunk_header.data_size_) {
+ cur_page_header.reset();
+ return E_OK;
+ }
+
do {
in_stream.mark_read_pos();
cur_page_header.reset();
@@ -434,6 +440,11 @@ int AlignedChunkReader::decode_cur_value_page_data() {
char* value_buf = nullptr;
uint32_t value_buf_size = 0;
+ if (cur_value_page_header_.compressed_size_ == 0) {
+ value_in_.wrap_from(value_buf, 0);
+ return E_OK;
+ }
+
// Step 2: do uncompress
if (IS_SUCC(ret)) {
value_compressed_buf =
@@ -521,10 +532,10 @@ int
AlignedChunkReader::decode_time_value_buf_into_tsblock(
uint32_t mask = 1 << 7;
\
int64_t time = 0;
\
CppType value;
\
- while (time_decoder_->has_remaining(time_in) &&
\
- value_decoder_->has_remaining(value_in)) {
\
+ while (time_decoder_->has_remaining(time_in)) {
\
cur_value_index++;
\
- if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
+ if (value_page_col_notnull_bitmap_.empty() ||
\
+ ((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
0xFF) &
\
(mask >> (cur_value_index % 8))) == 0) {
\
ret = time_decoder_->read_int64(time, time_in);
\
@@ -539,6 +550,10 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
row_appender.append_null(1);
\
continue;
\
}
\
+ assert(value_decoder_->has_remaining(value_in));
\
+ if (!value_decoder_->has_remaining(value_in)) {
\
+ return common::E_DATA_INCONSISTENCY;
\
+ }
\
if (UNLIKELY(!row_appender.add_row())) {
\
ret = E_OVERFLOW;
\
cur_value_index--;
\
@@ -565,10 +580,10 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
uint32_t mask = 1 << 7;
int64_t time = 0;
int32_t value;
- while (time_decoder_->has_remaining(time_in) &&
- value_decoder_->has_remaining(value_in)) {
+ while (time_decoder_->has_remaining(time_in)) {
cur_value_index++;
- if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
+ if (value_page_col_notnull_bitmap_.empty() ||
+ ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
ret = time_decoder_->read_int64(time, time_in);
if (ret != E_OK) {
@@ -582,6 +597,10 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
row_appender.append_null(1);
continue;
}
+ assert(value_decoder_->has_remaining(value_in));
+ if (!value_decoder_->has_remaining(value_in)) {
+ return common::E_DATA_INCONSISTENCY;
+ }
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
cur_value_index--;
@@ -654,14 +673,22 @@ int
AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
int64_t time = 0;
common::String value;
uint32_t mask = 1 << 7;
- while (time_decoder_->has_remaining(time_in) &&
- value_decoder_->has_remaining(value_in)) {
+ while (time_decoder_->has_remaining(time_in)) {
cur_value_index++;
bool should_read_data = true;
- if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
+ if (value_page_col_notnull_bitmap_.empty() ||
+ ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
should_read_data = false;
}
+
+ if (should_read_data) {
+ assert(value_decoder_->has_remaining(value_in));
+ if (!value_decoder_->has_remaining(value_in)) {
+ return E_DATA_INCONSISTENCY;
+ }
+ }
+
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
cur_value_index--;
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index 7bf29047..aefb7bc5 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -57,8 +57,8 @@ class AlignedChunkReader : public IChunkReader {
time_uncompressed_buf_(nullptr),
value_uncompressed_buf_(nullptr),
cur_value_index(-1) {}
- int init(ReadFile *read_file, common::String m_name,
- common::TSDataType data_type, Filter *time_filter) override;
+ int init(ReadFile* read_file, common::String m_name,
+ common::TSDataType data_type, Filter* time_filter) override;
void reset() override;
void destroy() override;
~AlignedChunkReader() override = default;
@@ -67,43 +67,46 @@ class AlignedChunkReader : public IChunkReader {
return prev_value_page_not_finish() ||
(value_chunk_visit_offset_ -
value_chunk_header_.serialized_size_ <
- value_chunk_header_.data_size_);
+ value_chunk_header_.data_size_) ||
+ prev_time_page_not_finish() ||
+ (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_
<
+ time_chunk_header_.data_size_);
}
- ChunkHeader &get_chunk_header() override { return value_chunk_header_; }
- int load_by_aligned_meta(ChunkMeta *time_meta,
- ChunkMeta *value_meta) override;
+ ChunkHeader& get_chunk_header() override { return value_chunk_header_; }
+ int load_by_aligned_meta(ChunkMeta* time_meta,
+ ChunkMeta* value_meta) override;
- int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter,
- common::PageArena &pa) override;
+ int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
+ common::PageArena& pa) override;
private:
FORCE_INLINE bool chunk_has_only_one_page(
- const ChunkHeader &chunk_header) const {
+ const ChunkHeader& chunk_header) const {
return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)
==
ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
}
- int alloc_compressor_and_decoder(storage::Decoder *&decoder,
- storage::Compressor *&compressor,
+ int alloc_compressor_and_decoder(storage::Decoder*& decoder,
+ storage::Compressor*& compressor,
common::TSEncoding encoding,
common::TSDataType data_type,
common::CompressionType compression_type);
- int get_cur_page_header(ChunkMeta *&chunk_meta,
- common::ByteStream &in_stream_,
- PageHeader &cur_page_header_,
- uint32_t &chunk_visit_offset,
- ChunkHeader &chunk_header);
- int read_from_file_and_rewrap(common::ByteStream &in_stream_,
- ChunkMeta *&chunk_meta,
- uint32_t &chunk_visit_offset,
- int32_t &file_data_buf_size,
+ int get_cur_page_header(ChunkMeta*& chunk_meta,
+ common::ByteStream& in_stream_,
+ PageHeader& cur_page_header_,
+ uint32_t& chunk_visit_offset,
+ ChunkHeader& chunk_header);
+ int read_from_file_and_rewrap(common::ByteStream& in_stream_,
+ ChunkMeta*& chunk_meta,
+ uint32_t& chunk_visit_offset,
+ int32_t& file_data_buf_size,
int want_size = 0, bool may_shrink = true);
- bool cur_page_statisify_filter(Filter *filter);
+ bool cur_page_statisify_filter(Filter* filter);
int skip_cur_page();
int decode_cur_time_page_data();
int decode_cur_value_page_data();
- int decode_time_value_buf_into_tsblock(common::TsBlock *&ret_tsblock,
- Filter *filter,
- common::PageArena *pa);
+ int decode_time_value_buf_into_tsblock(common::TsBlock*& ret_tsblock,
+ Filter* filter,
+ common::PageArena* pa);
bool prev_time_page_not_finish() const {
return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
time_in_.has_remaining();
@@ -114,25 +117,25 @@ class AlignedChunkReader : public IChunkReader {
value_in_.has_remaining();
}
- int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream &time_in,
- common::ByteStream &value_in,
- common::TsBlock *ret_tsblock,
- Filter *filter,
- common::PageArena *pa);
- int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in,
- common::ByteStream &value_in,
- common::RowAppender &row_appender,
- Filter *filter);
- int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in,
- common::ByteStream &value_in,
- common::RowAppender &row_appender,
- common::PageArena &pa,
- Filter *filter);
+ int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream& time_in,
+ common::ByteStream& value_in,
+ common::TsBlock* ret_tsblock,
+ Filter* filter,
+ common::PageArena* pa);
+ int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
+ common::ByteStream& value_in,
+ common::RowAppender& row_appender,
+ Filter* filter);
+ int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
+ common::ByteStream& value_in,
+ common::RowAppender& row_appender,
+ common::PageArena& pa,
+ Filter* filter);
private:
- ReadFile *read_file_;
- ChunkMeta *time_chunk_meta_;
- ChunkMeta *value_chunk_meta_;
+ ReadFile* read_file_;
+ ChunkMeta* time_chunk_meta_;
+ ChunkMeta* value_chunk_meta_;
common::String measurement_name_;
ChunkHeader time_chunk_header_;
// TODO: support reading more than one measurement in AlignedChunkReader.
@@ -161,16 +164,16 @@ class AlignedChunkReader : public IChunkReader {
uint32_t value_chunk_visit_offset_;
// Statistic *page_statistic_;
- Compressor *time_compressor_;
- Compressor *value_compressor_;
- Filter *time_filter_;
+ Compressor* time_compressor_;
+ Compressor* value_compressor_;
+ Filter* time_filter_;
- Decoder *time_decoder_;
- Decoder *value_decoder_;
+ Decoder* time_decoder_;
+ Decoder* value_decoder_;
common::ByteStream time_in_;
common::ByteStream value_in_;
- char *time_uncompressed_buf_;
- char *value_uncompressed_buf_;
+ char* time_uncompressed_buf_;
+ char* value_uncompressed_buf_;
std::vector<uint8_t> value_page_col_notnull_bitmap_;
uint32_t value_page_data_num_;
int32_t cur_value_index;
diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h
index 228a9333..87303cef 100644
--- a/cpp/src/reader/result_set.h
+++ b/cpp/src/reader/result_set.h
@@ -261,6 +261,8 @@ class ResultSetIterator {
if (result_set_) {
int ret = result_set_->next(has_next);
ASSERT(ret == 0);
+ // TODO:handle error in hasNext.
+ (void)ret;
if (has_next) {
cached_record_ = result_set_->get_row_record();
} else {
@@ -297,7 +299,8 @@ inline ResultSetIterator ResultSet::iterator() {
return ResultSetIterator(this);
}
-static void print_table_result_set(storage::ResultSet* table_result_set) {
+static MAYBE_UNUSED void print_table_result_set(
+ storage::ResultSet* table_result_set) {
if (table_result_set == nullptr) {
std::cout << "TableResultSet is nullptr" << std::endl;
return;
diff --git a/cpp/src/utils/errno_define.h b/cpp/src/utils/errno_define.h
index df16c5fe..f52cead9 100644
--- a/cpp/src/utils/errno_define.h
+++ b/cpp/src/utils/errno_define.h
@@ -43,9 +43,7 @@ const int E_COND_ERR = 19;
const int E_OVERFLOW = 20;
const int E_NO_MORE_DATA = 21;
const int E_OUT_OF_ORDER = 22;
-const int E_TSBLOCK_TYPE_NOT_SUPPORTED = 23;
-const int E_TSBLOCK_DATA_INCONSISTENCY = 24;
-const int E_DDL_UNKNOWN_TYPE = 25;
+const int E_DATA_INCONSISTENCY = 24;
const int E_TYPE_NOT_SUPPORTED = 26;
const int E_TYPE_NOT_MATCH = 27;
const int E_FILE_OPEN_ERR = 28;
diff --git a/cpp/src/utils/util_define.h b/cpp/src/utils/util_define.h
index 9667de76..2796dfb0 100644
--- a/cpp/src/utils/util_define.h
+++ b/cpp/src/utils/util_define.h
@@ -25,6 +25,13 @@
/* ======== unsued ======== */
#define UNUSED(v) ((void)(v))
+#if __cplusplus >= 201703L
+#define MAYBE_UNUSED [[maybe_unused]]
+#elif defined(__GNUC__) || defined(__clang__)
+#define MAYBE_UNUSED __attribute__((unused))
+#else
+#define MAYBE_UNUSED
+#endif
/* ======== inline ======== */
#ifdef __GNUC__
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index b123ef69..c281de41 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -477,3 +477,233 @@ TEST_F(TsFileTableReaderTest, TestDecoder) {
reader.destroy_query_data_set(table_result_set);
reader.close();
}
+
+void test_null_table(WriteFile* write_file, int max_rows,
+ std::function<void(Tablet*, int)> insert_data_into_tablet,
+ std::function<void(TableResultSet*, int)> check) {
+ std::string table_name = "t1";
+ auto* schema = new storage::TableSchema(
+ table_name,
+ {
+ common::ColumnSchema("id1", common::TSDataType::STRING,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::TAG),
+ common::ColumnSchema("id2", common::TSDataType::STRING,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::TAG),
+ common::ColumnSchema("s1", common::TSDataType::INT64,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::FIELD),
+ common::ColumnSchema("s2", common::TSDataType::INT32,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::FIELD),
+ common::ColumnSchema("s3", common::TSDataType::FLOAT,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::FIELD),
+ common::ColumnSchema("s4", common::TSDataType::DOUBLE,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::FIELD),
+ common::ColumnSchema("s5", common::TSDataType::STRING,
+ common::CompressionType::UNCOMPRESSED,
+ common::TSEncoding::PLAIN,
+ common::ColumnCategory::FIELD),
+ });
+ uint64_t memory_threshold = 128 * 1024 * 1024;
+ auto* writer =
+ new storage::TsFileTableWriter(write_file, schema, memory_threshold);
+ storage::Tablet tablet(
+ {
+ "id1",
+ "id2",
+ "s1",
+ "s2",
+ "s3",
+ "s4",
+ "s5",
+ },
+ {
+ common::TSDataType::STRING,
+ common::TSDataType::STRING,
+ common::TSDataType::INT64,
+ common::TSDataType::INT32,
+ common::TSDataType::FLOAT,
+ common::TSDataType::DOUBLE,
+ common::TSDataType::STRING,
+ },
+ max_rows);
+ insert_data_into_tablet(&tablet, max_rows);
+ writer->write_table(tablet);
+ writer->flush();
+ writer->close();
+ delete writer;
+ delete schema;
+ storage::TsFileReader reader;
+ reader.open(write_file->get_file_path());
+ std::vector<std::string> columns;
+ std::int64_t start_time = INT64_MIN;
+ std::int64_t end_time = INT64_MAX;
+ storage::ResultSet* temp_ret = nullptr;
+ reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4", "s5"},
+ start_time, end_time, temp_ret);
+ auto ret = dynamic_cast<storage::TableResultSet*>(temp_ret);
+ std::cout << std::endl;
+ check(ret, max_rows);
+ ret->close();
+ reader.destroy_query_data_set(ret);
+ reader.close();
+}
+
+TEST_F(TsFileTableReaderTest, TestNullInTable) {
+ // 1. In some rows, all FIELD columns are empty.
+ test_null_table(
+ &write_file_, 10,
+ [](Tablet* tablet, int max_rows) {
+ for (int row = 0; row < max_rows; row++) {
+ int64_t timestamp = row;
+ tablet->add_timestamp(row, timestamp);
+ tablet->add_value(row, "id1", "id1");
+ tablet->add_value(row, "id2", "id2");
+ if (row % 2 == 0) {
+ tablet->add_value(row, "s1", static_cast<int64_t>(row));
+ tablet->add_value(row, "s2", 1);
+ tablet->add_value(row, "s3", 1.1f);
+ tablet->add_value(row, "s4", 1.2);
+ tablet->add_value(row, "s5", "test");
+ }
+ }
+ },
+ [](TableResultSet* result, int max_rows) {
+ bool has_next = false;
+ int line = 0;
+ while ((result->next(has_next)) == common::E_OK && has_next) {
+ line++;
+ if (result->get_value<int64_t>(1) % 2 != 0) {
+ ASSERT_TRUE(result->is_null("s1"));
+ ASSERT_TRUE(result->is_null("s2"));
+ ASSERT_TRUE(result->is_null("s3"));
+ ASSERT_TRUE(result->is_null("s4"));
+ ASSERT_TRUE(result->is_null("s5"));
+ }
+ ASSERT_FALSE(result->is_null("id1"));
+ ASSERT_FALSE(result->is_null("id2"));
+ }
+ ASSERT_EQ(line, max_rows);
+ });
+}
+
+TEST_F(TsFileTableReaderTest, TestNullInTable2) {
+ // 2. In some rows, the TAG column is entirely empty,
+ // and in some rows, all FIELD columns are empty.
+ test_null_table(
+ &write_file_, 10,
+ [](Tablet* tablet, int max_rows) {
+ for (int row = 0; row < max_rows; row++) {
+ int64_t timestamp = row;
+ tablet->add_timestamp(row, timestamp);
+ if (row % 2 == 0) {
+ tablet->add_value(row, "id1", "id1");
+ tablet->add_value(row, "id2", "id2");
+ } else {
+ tablet->add_value(row, "s1", static_cast<int64_t>(row));
+ tablet->add_value(row, "s2", 1);
+ tablet->add_value(row, "s3", 1.1f);
+ tablet->add_value(row, "s4", 1.2);
+ tablet->add_value(row, "s5", "test");
+ }
+ }
+ },
+ [](TableResultSet* result, int max_rows) {
+ bool has_next = false;
+ int line = 0;
+ while ((result->next(has_next)) == common::E_OK && has_next) {
+ line++;
+ bool even = result->get_value<int64_t>(1) % 2 == 0;
+ ASSERT_EQ(result->is_null("s1"), even);
+ ASSERT_EQ(result->is_null("s2"), even);
+ ASSERT_EQ(result->is_null("s3"), even);
+ ASSERT_EQ(result->is_null("s4"), even);
+ ASSERT_EQ(result->is_null("s5"), even);
+ ASSERT_EQ(result->is_null("id1"), !even);
+ ASSERT_EQ(result->is_null("id2"), !even);
+ }
+ ASSERT_EQ(line, max_rows);
+ });
+}
+
+TEST_F(TsFileTableReaderTest, TestNullInTable3) {
+ // 3. In some rows, the TAG and Field columns are entirely empty,
+ test_null_table(
+ &write_file_, 10,
+ [](Tablet* tablet, int max_rows) {
+ for (int row = 0; row < max_rows; row++) {
+ int64_t timestamp = row;
+ tablet->add_timestamp(row, timestamp);
+ if (row % 2 == 0) {
+ tablet->add_value(row, "id1", "id1");
+ tablet->add_value(row, "id2", "id2");
+ tablet->add_value(row, "s1", static_cast<int64_t>(row));
+ tablet->add_value(row, "s2", 1);
+ tablet->add_value(row, "s3", 1.1f);
+ tablet->add_value(row, "s4", 1.2);
+ tablet->add_value(row, "s5", "test");
+ }
+ }
+ },
+ [](TableResultSet* result, int max_rows) {
+ bool has_next = false;
+ int line = 0;
+ while ((result->next(has_next)) == common::E_OK && has_next) {
+ line++;
+ bool odd = result->get_value<int64_t>(1) % 2 != 0;
+ ASSERT_EQ(result->is_null("s1"), odd);
+ ASSERT_EQ(result->is_null("s2"), odd);
+ ASSERT_EQ(result->is_null("s3"), odd);
+ ASSERT_EQ(result->is_null("s4"), odd);
+ ASSERT_EQ(result->is_null("s5"), odd);
+ ASSERT_EQ(result->is_null("id1"), odd);
+ ASSERT_EQ(result->is_null("id2"), odd);
+ }
+ ASSERT_EQ(line, max_rows);
+ });
+}
+
+TEST_F(TsFileTableReaderTest, TestNullInTable4) {
+ // 3. In some rows, the TAG and Field columns are entirely empty,
+ test_null_table(
+ &write_file_, 1000000,
+ [](Tablet* tablet, int max_rows) {
+ for (int row = 0; row < max_rows; row++) {
+ int64_t timestamp = row;
+ tablet->add_timestamp(row, timestamp);
+ tablet->add_value(row, "id1", "id1");
+ tablet->add_value(row, "id2", "id2");
+ if (row < 10) {
+ tablet->add_value(row, "s1", static_cast<int64_t>(row));
+ tablet->add_value(row, "s2", 1);
+ tablet->add_value(row, "s3", 1.1f);
+ tablet->add_value(row, "s4", 1.2);
+ tablet->add_value(row, "s5", "test");
+ }
+ }
+ },
+ [](TableResultSet* result, int max_rows) {
+ bool has_next = false;
+ int line = 0;
+ while ((result->next(has_next)) == common::E_OK && has_next) {
+ line++;
+ bool available = result->get_value<int64_t>(1) < 10;
+ ASSERT_EQ(!result->is_null("s1"), available);
+ ASSERT_EQ(!result->is_null("s2"), available);
+ ASSERT_EQ(!result->is_null("s3"), available);
+ ASSERT_EQ(!result->is_null("s4"), available);
+ ASSERT_EQ(!result->is_null("s5"), available);
+ }
+ ASSERT_EQ(line, max_rows);
+ });
+}