This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 0486f35e56960e25069b5b8b14133d47f8452d88 Author: colin <[email protected]> AuthorDate: Tue Nov 18 17:50:09 2025 +0800 support read tree. --- cpp/src/reader/table_query_executor.cc | 78 ++++++++++++++++++++++++++++++++++ cpp/src/reader/table_query_executor.h | 4 ++ cpp/src/reader/tsfile_reader.cc | 37 +++++++++++++++- cpp/src/reader/tsfile_reader.h | 3 ++ cpp/test/reader/tsfile_reader_test.cc | 29 +++++++++++++ 5 files changed, 149 insertions(+), 2 deletions(-) diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index d09a5c90..e99fd18c 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -19,6 +19,8 @@ #include "reader/table_query_executor.h" +#include "utils/db_utils.h" + namespace storage { int TableQueryExecutor::query(const std::string &table_name, const std::vector<std::string> &columns, @@ -83,6 +85,82 @@ int TableQueryExecutor::query(const std::string &table_name, ret_qds = new TableResultSet(std::move(tsblock_reader), columns, data_types); return ret; +} +int TableQueryExecutor::query_on_tree( + const std::vector<std::shared_ptr<IDeviceID>> &devices, + const std::vector<std::string> &tag_columns, + const std::vector<std::string> &field_columns, Filter *time_filter, + ResultSet *&ret_qds) { + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + int ret = common::E_OK; + TsFileMeta *file_meta = tsfile_io_reader_->get_tsfile_meta(); + std::vector<MetaIndexNode *> table_inodes; + for (auto const &device : devices) { + MetaIndexNode *table_inode; + if (RET_FAIL(file_meta->get_table_metaindex_node( + device->get_table_name(), table_inode))) { + }; + table_inodes.push_back(table_inode); + } + + std::vector<common::ColumnSchema> col_schema; + for (auto const &tag : tag_columns) { + col_schema.emplace_back(tag, common::TSDataType::STRING, common::ColumnCategory::TAG); + } + + std::unordered_map<std::string, common::TSDataType> column_types_map; + + for (auto const &device : devices) { + bool all_collected = true; + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) == column_types_map.end()) { + all_collected = false; + break; + } + } + if (all_collected) { + break; + } + + std::unordered_set<std::string> measurements(field_columns.begin(), field_columns.end()); + std::vector<ITimeseriesIndex*> index(measurements.size()); + if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(device, measurements, index, pa))) { + continue; + } + + for (auto* ts_index : index) { + if (ts_index != nullptr) { + std::string measurement_name = ts_index->get_measurement_name().to_std_string(); + if (column_types_map.find(measurement_name) == column_types_map.end()) { + common::TSDataType type = ts_index->get_data_type(); + if (type == common::TSDataType::INT32 || + type == common::TSDataType::INT64 || + type == common::TSDataType::TIMESTAMP || + type == common::TSDataType::DATE) { + type = common::TSDataType::INT64; + } else if (type == common::TSDataType::FLOAT) { + type = common::TSDataType::DOUBLE; + } + column_types_map[measurement_name] = type; + } + } + } + } + + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) != column_types_map.end()){ + col_schema.emplace_back(field_col, column_types_map[field_col], common::ColumnCategory::FIELD); + } else { + col_schema.emplace_back(field_col, common::TSDataType::INVALID_DATATYPE, common::ColumnCategory::FIELD); + } + } + + + TableSchema schema = TableSchema("default", col_schema); + + + } void TableQueryExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 83a82fe5..bf33f7c8 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -65,6 +65,10 @@ class TableQueryExecutor { int query(const std::string &table_name, const std::vector<std::string> &columns, Filter *time_filter, Filter *id_filter, Filter *field_filter, ResultSet *&ret_qds); + int query_on_tree(const std::vector<std::shared_ptr<IDeviceID>>& devices, + const std::vector<std::string> &tag_columns, + const std::vector<std::string> &field_columns, + Filter *time_filter, ResultSet *&ret_qds); void destroy_query_data_set(ResultSet *qds); private: diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 6da09430..0f66fe18 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -99,8 +99,6 @@ int TsFileReader::query(const std::string& table_name, return E_TABLE_NOT_EXIST; } - std::vector<TSDataType> data_types = table_schema->get_data_types(); - Filter* time_filter = new TimeBetween(start_time, end_time, false); ret = table_query_executor_->query(to_lower(table_name), columns_names, @@ -108,6 +106,41 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::query_table_on_tree( + const std::vector<std::string>& measurement_names, int64_t star_time, + int64_t end_time, ResultSet*& result_set) { + int ret = E_OK; + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return E_TSFILE_WRITER_META_ERR; + } + auto device_names = this->get_all_device_ids(); + std::vector<std::shared_ptr<IDeviceID>> device_ids; + size_t max_len = 0; + for (auto& device_name : device_names) { + std::vector<MeasurementSchema> schemas; + ret = this->get_timeseries_schema(device_name, schemas); + for (auto schema : schemas) { + if (std::find(measurement_names.begin(), measurement_names.end(), + schema.measurement_name_) != + measurement_names.end()) { + device_ids.push_back(device_name); + if (device_name->get_segments().size() > max_len) { + max_len = device_name->get_segments().size(); + } + continue; + } + } + } + std::vector<std::string> columns_names(max_len); + for (int i = 0; i < max_len; i++) { + columns_names.push_back("L_" + std::to_string(i)); + } + Filter* time_filter = new TimeBetween(star_time, end_time, false); + ret = table_query_executor_->query_on_tree(device_ids, columns_names, measurement_names, time_filter, result_set); + return ret; +} + void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) { tsfile_executor_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index eb6a7b70..1344309a 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -96,6 +96,9 @@ class TsFileReader { int query(const std::string &table_name, const std::vector<std::string> &columns_names, int64_t start_time, int64_t end_time, ResultSet *&result_set); + + int query_table_on_tree(const std::vector<std::string> &measurement_names, int64_t star_time, + int64_t end_time, ResultSet *&result_set); /** * @brief destroy the result set, this method should be called after the * query is finished and result_set diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc index c2dfe2ce..ce7f5631 100644 --- a/cpp/test/reader/tsfile_reader_test.cc +++ b/cpp/test/reader/tsfile_reader_test.cc @@ -195,6 +195,35 @@ TEST_F(TsFileReaderTest, GetAllDevice) { } } +TEST_F(TsFileReaderTest, ReadTableFromTree) { + std::vector<std::string> device_path = {"root.db.t1.t2", "root.db2.t1.t2", + "device.t1", "device.ln.t2"}; + std::vector<std::string> measurements = {"temperature", "humidity"}; + common::TSDataType data_type = common::TSDataType::INT32; + common::TSEncoding encoding = common::TSEncoding::PLAIN; + common::CompressionType compression_type = + common::CompressionType::UNCOMPRESSED; + for (auto const& device : device_path) { + TsRecord record_0(1622505600000, device); + for (auto const& measurement : measurements) { + tsfile_writer_->register_timeseries(device, storage::MeasurementSchema( + measurement, data_type, encoding, compression_type + )); + record_0.add_point(measurement, (int32_t)0); + } + ASSERT_EQ(tsfile_writer_->write_record(record_0), E_OK); + + } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + storage::ResultSet* tmp_qds = nullptr; + reader.query_table_on_tree({"temperature", "humidity"}, INT64_MIN, INT64_MAX, tmp_qds); + +} + TEST_F(TsFileReaderTest, GetTimeseriesSchema) { std::vector<std::string> device_path = {"device", "device.ln"}; std::vector<std::string> measurement_name = {"temperature", "humidity"};
