This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit f1247e4cad0282dc4bb7b5c35539968d64da67ef Author: ColinLee <[email protected]> AuthorDate: Tue Nov 18 23:26:52 2025 +0800 support read tree by table. --- cpp/src/reader/device_meta_iterator.h | 9 + cpp/src/reader/imeta_data_querier.h | 50 +++--- cpp/src/reader/meta_data_querier.cc | 177 ++++++++++--------- cpp/src/reader/meta_data_querier.h | 79 +++++---- cpp/src/reader/table_query_executor.cc | 273 ++++++++++++++++------------- cpp/src/reader/task/device_task_iterator.h | 82 +++++---- 6 files changed, 363 insertions(+), 307 deletions(-) diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 55f20913..5a9d27ae 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -37,6 +37,15 @@ class DeviceMetaIterator { pa_.init(512, common::MOD_DEVICE_META_ITER); } + DeviceMetaIterator(TsFileIOReader *io_reader, + std::vector<MetaIndexNode*> meta_index_node_list, + const Filter *id_filter): io_reader_(io_reader) { + for (auto meta_index_node : meta_index_node_list) { + meta_index_nodes_.push(meta_index_node); + } + pa_.init(512, common::MOD_DEVICE_META_ITER); + } + ~DeviceMetaIterator() { pa_.destroy(); } bool has_next(); diff --git a/cpp/src/reader/imeta_data_querier.h b/cpp/src/reader/imeta_data_querier.h index 73a005e8..d95e99ae 100644 --- a/cpp/src/reader/imeta_data_querier.h +++ b/cpp/src/reader/imeta_data_querier.h @@ -26,38 +26,40 @@ #include "reader/device_meta_iterator.h" namespace storage { + class IMetadataQuerier { + public: + virtual ~IMetadataQuerier() = default; -class IMetadataQuerier { - public: - virtual ~IMetadataQuerier() = default; + virtual std::vector<std::shared_ptr<ChunkMeta> > get_chunk_metadata_list( + const Path &path) const = 0; - virtual std::vector<std::shared_ptr<ChunkMeta>> get_chunk_metadata_list( - const Path& path) const = 0; + virtual std::vector<std::vector<std::shared_ptr<ChunkMeta> > > + get_chunk_metadata_lists( + std::shared_ptr<IDeviceID> device_id, + const std::unordered_set<std::string> &field_names, + const MetaIndexNode *field_node = nullptr) const = 0; - virtual std::vector<std::vector<std::shared_ptr<ChunkMeta>>> - get_chunk_metadata_lists( - std::shared_ptr<IDeviceID> device_id, - const std::unordered_set<std::string>& field_names, - const MetaIndexNode* field_node = nullptr) const = 0; + virtual std::map<Path, std::vector<std::shared_ptr<ChunkMeta> > > + get_chunk_metadata_map(const std::vector<Path> &paths) const = 0; - virtual std::map<Path, std::vector<std::shared_ptr<ChunkMeta>>> - get_chunk_metadata_map(const std::vector<Path>& paths) const = 0; + virtual int get_whole_file_metadata(TsFileMeta *tsfile_meta) const = 0; - virtual int get_whole_file_metadata(TsFileMeta* tsfile_meta) const = 0; + virtual void load_chunk_metadatas(const std::vector<Path> &paths) = 0; - virtual void load_chunk_metadatas(const std::vector<Path>& paths) = 0; + virtual common::TSDataType get_data_type(const Path &path) const = 0; - virtual common::TSDataType get_data_type(const Path& path) const = 0; + virtual std::vector<TimeRange> convert_space_to_time_partition( + const std::vector<Path> &paths, int64_t space_partition_start_pos, + int64_t space_partition_end_pos) const = 0; - virtual std::vector<TimeRange> convert_space_to_time_partition( - const std::vector<Path>& paths, int64_t space_partition_start_pos, - int64_t space_partition_end_pos) const = 0; + virtual void clear() = 0; - virtual void clear() = 0; + virtual std::unique_ptr<DeviceMetaIterator> device_iterator( + MetaIndexNode *root, const Filter *id_filter) = 0; - virtual std::unique_ptr<DeviceMetaIterator> device_iterator( - MetaIndexNode* root, const Filter* id_filter) = 0; -}; - -} // end namespace storage + // FIXME(Colin): refine this. + virtual std::unique_ptr<DeviceMetaIterator> device_iterator( + std::vector<MetaIndexNode *> root, const Filter *id_filter) = 0; + }; +} // end namespace storage #endif // READER_IMETA_DATA_QUERIER_H diff --git a/cpp/src/reader/meta_data_querier.cc b/cpp/src/reader/meta_data_querier.cc index 5a32b922..2fbd3abf 100644 --- a/cpp/src/reader/meta_data_querier.cc +++ b/cpp/src/reader/meta_data_querier.cc @@ -22,88 +22,95 @@ #include "device_meta_iterator.h" namespace storage { - -MetadataQuerier::MetadataQuerier(TsFileIOReader* tsfile_io_reader) - : io_reader_(tsfile_io_reader) { - file_metadata_ = io_reader_->get_tsfile_meta(); - device_chunk_meta_cache_ = std::unique_ptr< - common::Cache<std::string /*ToDO: Device ID*/, - std::vector<std::shared_ptr<ChunkMeta>>, std::mutex>>( - new common::Cache<std::string, std::vector<std::shared_ptr<ChunkMeta>>, - std::mutex>(CACHED_ENTRY_NUMBER, - CACHED_ENTRY_NUMBER / 10)); -} - -MetadataQuerier::~MetadataQuerier() {} - -std::vector<std::shared_ptr<ChunkMeta>> -MetadataQuerier::get_chunk_metadata_list(const Path& path) const { - // std::vector<std::shared_ptr<ChunkMeta>> chunk_meta_list; - // if (device_chunk_meta_cache_->tryGet(path.device_, chunk_meta_list)) { - // return chunk_meta_list; - // } else { - // io_reader_->get_chunk_metadata_list(path.device_, path.measurement_, - // chunk_meta_list); - // } - // return io_reader_->get_chunk_metadata_list(path); - ASSERT(false); - return {}; -} - -std::vector<std::vector<std::shared_ptr<ChunkMeta>>> -MetadataQuerier::get_chunk_metadata_lists( - std::shared_ptr<IDeviceID> device_id, - const std::unordered_set<std::string>& field_names, - const MetaIndexNode* field_node) const { - // return io_reader_->get_chunk_metadata_lists(device_id, field_names, - // field_node); - ASSERT(false); - return {}; -} - -std::map<Path, std::vector<std::shared_ptr<ChunkMeta>>> -MetadataQuerier::get_chunk_metadata_map(const std::vector<Path>& paths) const { - // return io_reader_->get_chunk_metadata_map(paths); - ASSERT(false); - return {}; -} - -int MetadataQuerier::get_whole_file_metadata(TsFileMeta* tsfile_meta) const { - tsfile_meta = io_reader_->get_tsfile_meta(); - return common::E_OK; -} - -void MetadataQuerier::load_chunk_metadatas(const std::vector<Path>& paths) { - // io_reader_->load_chunk_metadatas(paths); - ASSERT(false); -} - -common::TSDataType MetadataQuerier::get_data_type(const Path& path) const { - ASSERT(false); - return common::INVALID_DATATYPE; -} - -std::vector<TimeRange> MetadataQuerier::convert_space_to_time_partition( - const std::vector<Path>& paths, int64_t spacePartitionStartPos, - int64_t spacePartitionEndPos) const { - ASSERT(false); - return {}; -} - -void MetadataQuerier::clear() {} - -std::unique_ptr<DeviceMetaIterator> MetadataQuerier::device_iterator( - MetaIndexNode* root, const Filter* id_filter) { - return std::unique_ptr<DeviceMetaIterator>( - new DeviceMetaIterator(io_reader_, root, id_filter)); -} - -int MetadataQuerier::load_chunk_meta( - const std::pair<IDeviceID, std::string>& key, - std::vector<ChunkMeta*>& chunk_meta_list) { - // return io_reader_->load_chunk_meta(key, chunk_meta_list); - ASSERT(false); - return common::E_NOT_SUPPORT; -} - -} // end namespace storage + MetadataQuerier::MetadataQuerier(TsFileIOReader *tsfile_io_reader) + : io_reader_(tsfile_io_reader) { + file_metadata_ = io_reader_->get_tsfile_meta(); + device_chunk_meta_cache_ = std::unique_ptr< + common::Cache<std::string /*ToDO: Device ID*/, + std::vector<std::shared_ptr<ChunkMeta> >, std::mutex> >( + new common::Cache<std::string, std::vector<std::shared_ptr<ChunkMeta> >, + std::mutex>(CACHED_ENTRY_NUMBER, + CACHED_ENTRY_NUMBER / 10)); + } + + MetadataQuerier::~MetadataQuerier() { + } + + std::vector<std::shared_ptr<ChunkMeta> > + MetadataQuerier::get_chunk_metadata_list(const Path &path) const { + // std::vector<std::shared_ptr<ChunkMeta>> chunk_meta_list; + // if (device_chunk_meta_cache_->tryGet(path.device_, chunk_meta_list)) { + // return chunk_meta_list; + // } else { + // io_reader_->get_chunk_metadata_list(path.device_, path.measurement_, + // chunk_meta_list); + // } + // return io_reader_->get_chunk_metadata_list(path); + ASSERT(false); + return {}; + } + + std::vector<std::vector<std::shared_ptr<ChunkMeta> > > + MetadataQuerier::get_chunk_metadata_lists( + std::shared_ptr<IDeviceID> device_id, + const std::unordered_set<std::string> &field_names, + const MetaIndexNode *field_node) const { + // return io_reader_->get_chunk_metadata_lists(device_id, field_names, + // field_node); + ASSERT(false); + return {}; + } + + std::map<Path, std::vector<std::shared_ptr<ChunkMeta> > > + MetadataQuerier::get_chunk_metadata_map(const std::vector<Path> &paths) const { + // return io_reader_->get_chunk_metadata_map(paths); + ASSERT(false); + return {}; + } + + int MetadataQuerier::get_whole_file_metadata(TsFileMeta *tsfile_meta) const { + tsfile_meta = io_reader_->get_tsfile_meta(); + return common::E_OK; + } + + void MetadataQuerier::load_chunk_metadatas(const std::vector<Path> &paths) { + // io_reader_->load_chunk_metadatas(paths); + ASSERT(false); + } + + common::TSDataType MetadataQuerier::get_data_type(const Path &path) const { + ASSERT(false); + return common::INVALID_DATATYPE; + } + + std::vector<TimeRange> MetadataQuerier::convert_space_to_time_partition( + const std::vector<Path> &paths, int64_t spacePartitionStartPos, + int64_t spacePartitionEndPos) const { + ASSERT(false); + return {}; + } + + void MetadataQuerier::clear() { + } + + std::unique_ptr<DeviceMetaIterator> MetadataQuerier::device_iterator( + MetaIndexNode *root, const Filter *id_filter) { + return std::unique_ptr<DeviceMetaIterator>( + new DeviceMetaIterator(io_reader_, root, id_filter)); + } + + std::unique_ptr<DeviceMetaIterator> MetadataQuerier::device_iterator( + std::vector<MetaIndexNode *> root, const Filter *id_filter) { + return std::unique_ptr<DeviceMetaIterator>( + new DeviceMetaIterator( + io_reader_, root, id_filter)); + } + + int MetadataQuerier::load_chunk_meta( + const std::pair<IDeviceID, std::string> &key, + std::vector<ChunkMeta *> &chunk_meta_list) { + // return io_reader_->load_chunk_meta(key, chunk_meta_list); + ASSERT(false); + return common::E_NOT_SUPPORT; + } +} // end namespace storage diff --git a/cpp/src/reader/meta_data_querier.h b/cpp/src/reader/meta_data_querier.h index b4eed350..97c1b7d2 100644 --- a/cpp/src/reader/meta_data_querier.h +++ b/cpp/src/reader/meta_data_querier.h @@ -27,56 +27,59 @@ #include "imeta_data_querier.h" namespace storage { + class MetadataQuerier : public IMetadataQuerier { + public: + static constexpr int CACHED_ENTRY_NUMBER = 1000; -class MetadataQuerier : public IMetadataQuerier { - public: - static constexpr int CACHED_ENTRY_NUMBER = 1000; + enum class LocateStatus { BEFORE, IN, AFTER }; - enum class LocateStatus { BEFORE, IN, AFTER }; + explicit MetadataQuerier(TsFileIOReader *tsfile_io_reader); - explicit MetadataQuerier(TsFileIOReader* tsfile_io_reader); - ~MetadataQuerier() override; - std::vector<std::shared_ptr<ChunkMeta>> get_chunk_metadata_list( - const Path& path) const override; + ~MetadataQuerier() override; - std::vector<std::vector<std::shared_ptr<ChunkMeta>>> - get_chunk_metadata_lists( - std::shared_ptr<IDeviceID> device_id, - const std::unordered_set<std::string>& field_names, - const MetaIndexNode* field_node = nullptr) const override; + std::vector<std::shared_ptr<ChunkMeta> > get_chunk_metadata_list( + const Path &path) const override; - std::map<Path, std::vector<std::shared_ptr<ChunkMeta>>> - get_chunk_metadata_map(const std::vector<Path>& paths) const override; + std::vector<std::vector<std::shared_ptr<ChunkMeta> > > + get_chunk_metadata_lists( + std::shared_ptr<IDeviceID> device_id, + const std::unordered_set<std::string> &field_names, + const MetaIndexNode *field_node = nullptr) const override; - int get_whole_file_metadata(TsFileMeta* tsfile_meta) const override; + std::map<Path, std::vector<std::shared_ptr<ChunkMeta> > > + get_chunk_metadata_map(const std::vector<Path> &paths) const override; - void load_chunk_metadatas(const std::vector<Path>& paths) override; + int get_whole_file_metadata(TsFileMeta *tsfile_meta) const override; - common::TSDataType get_data_type(const Path& path) const override; + void load_chunk_metadatas(const std::vector<Path> &paths) override; - std::vector<TimeRange> convert_space_to_time_partition( - const std::vector<Path>& paths, int64_t spacePartitionStartPos, - int64_t spacePartitionEndPos) const override; + common::TSDataType get_data_type(const Path &path) const override; - std::unique_ptr<DeviceMetaIterator> device_iterator( - MetaIndexNode* root, const Filter* id_filter) override; + std::vector<TimeRange> convert_space_to_time_partition( + const std::vector<Path> &paths, int64_t spacePartitionStartPos, + int64_t spacePartitionEndPos) const override; - void clear() override; + std::unique_ptr<DeviceMetaIterator> device_iterator( + MetaIndexNode *root, const Filter *id_filter) override; - private: - TsFileIOReader* io_reader_; - TsFileMeta* file_metadata_; - std::unique_ptr< - common::Cache<std::string, /*Todo std::pair<IDeviceID, std::string>*/ - std::vector<std::shared_ptr<ChunkMeta>>, std::mutex>> - device_chunk_meta_cache_; + std::unique_ptr<DeviceMetaIterator> device_iterator( + std::vector<MetaIndexNode *> root, const Filter *id_filter) override; + + void clear() override; - int load_chunk_meta(const std::pair<IDeviceID, std::string>& key, - std::vector<ChunkMeta*>& chunk_meta_list); + private: + TsFileIOReader *io_reader_; + TsFileMeta *file_metadata_; + std::unique_ptr< + common::Cache<std::string, /*Todo std::pair<IDeviceID, std::string>*/ + std::vector<std::shared_ptr<ChunkMeta> >, std::mutex> > + device_chunk_meta_cache_; - static LocateStatus check_locate_status( - const std::shared_ptr<ChunkMeta>& chunk_meta, long start, long end); -}; + int load_chunk_meta(const std::pair<IDeviceID, std::string> &key, + std::vector<ChunkMeta *> &chunk_meta_list); -} // end namespace storage -#endif // READER_META_DATA_QUERIER_H \ No newline at end of file + static LocateStatus check_locate_status( + const std::shared_ptr<ChunkMeta> &chunk_meta, long start, long end); + }; +} // end namespace storage +#endif // READER_META_DATA_QUERIER_H diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index e99fd18c..1f9d0d19 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -22,147 +22,170 @@ #include "utils/db_utils.h" namespace storage { -int TableQueryExecutor::query(const std::string &table_name, - const std::vector<std::string> &columns, - Filter *time_filter, Filter *id_filter, - Filter *field_filter, ResultSet *&ret_qds) { - int ret = common::E_OK; - TsFileMeta *file_metadata = nullptr; - file_metadata = tsfile_io_reader_->get_tsfile_meta(); - common::PageArena pa; - pa.init(512, common::MOD_TSFILE_READER); - MetaIndexNode *table_root = nullptr; - std::shared_ptr<TableSchema> table_schema; - if (RET_FAIL( + int TableQueryExecutor::query(const std::string &table_name, + const std::vector<std::string> &columns, + Filter *time_filter, Filter *id_filter, + Filter *field_filter, ResultSet *&ret_qds) { + int ret = common::E_OK; + TsFileMeta *file_metadata = nullptr; + file_metadata = tsfile_io_reader_->get_tsfile_meta(); + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + MetaIndexNode *table_root = nullptr; + std::shared_ptr<TableSchema> table_schema; + if (RET_FAIL( file_metadata->get_table_metaindex_node(table_name, table_root))) { - } else if (RET_FAIL( - file_metadata->get_table_schema(table_name, table_schema))) { - } + } else if (RET_FAIL( + file_metadata->get_table_schema(table_name, table_schema))) { + } - if (IS_FAIL(ret)) { - ret_qds = nullptr; - return ret; - } - std::vector<std::string> lower_case_column_names(columns); - for (auto &column : lower_case_column_names) { - to_lowercase_inplace(column); - } - std::shared_ptr<ColumnMapping> column_mapping = - std::make_shared<ColumnMapping>(); - for (size_t i = 0; i < lower_case_column_names.size(); ++i) { - column_mapping->add(lower_case_column_names[i], static_cast<int>(i), - *table_schema); - } - std::vector<common::TSDataType> data_types; - data_types.reserve(lower_case_column_names.size()); - for (size_t i = 0; i < lower_case_column_names.size(); ++i) { - auto ind = table_schema->find_column_index(lower_case_column_names[i]); - if (ind < 0) { - delete time_filter; - return common::E_COLUMN_NOT_EXIST; + if (IS_FAIL(ret)) { + ret_qds = nullptr; + return ret; } - data_types.push_back(table_schema->get_data_types()[ind]); - } - // column_mapping.add(*measurement_filter); - - auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>( - new DeviceTaskIterator(columns, table_root, column_mapping, - meta_data_querier_, id_filter, table_schema)); - - std::unique_ptr<TsBlockReader> tsblock_reader; - switch (table_query_ordering_) { - case TableQueryOrdering::DEVICE: - tsblock_reader = std::unique_ptr<DeviceOrderedTsBlockReader>( - new DeviceOrderedTsBlockReader( - std::move(device_task_iterator), meta_data_querier_, - block_size_, tsfile_io_reader_, time_filter, field_filter)); - break; - case TableQueryOrdering::TIME: - default: - ret = common::E_UNSUPPORTED_ORDER; + std::vector<std::string> lower_case_column_names(columns); + for (auto &column: lower_case_column_names) { + to_lowercase_inplace(column); + } + std::shared_ptr<ColumnMapping> column_mapping = + std::make_shared<ColumnMapping>(); + for (size_t i = 0; i < lower_case_column_names.size(); ++i) { + column_mapping->add(lower_case_column_names[i], static_cast<int>(i), + *table_schema); + } + std::vector<common::TSDataType> data_types; + data_types.reserve(lower_case_column_names.size()); + for (size_t i = 0; i < lower_case_column_names.size(); ++i) { + auto ind = table_schema->find_column_index(lower_case_column_names[i]); + if (ind < 0) { + delete time_filter; + return common::E_COLUMN_NOT_EXIST; + } + data_types.push_back(table_schema->get_data_types()[ind]); + } + // column_mapping.add(*measurement_filter); + + auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>( + new DeviceTaskIterator(columns, table_root, column_mapping, + meta_data_querier_, id_filter, table_schema)); + + std::unique_ptr<TsBlockReader> tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr<DeviceOrderedTsBlockReader>( + new DeviceOrderedTsBlockReader( + std::move(device_task_iterator), meta_data_querier_, + block_size_, tsfile_io_reader_, time_filter, field_filter)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = + new TableResultSet(std::move(tsblock_reader), columns, data_types); + return ret; } - assert(tsblock_reader != nullptr); - ret_qds = - new TableResultSet(std::move(tsblock_reader), columns, data_types); - return ret; -} -int TableQueryExecutor::query_on_tree( - const std::vector<std::shared_ptr<IDeviceID>> &devices, - const std::vector<std::string> &tag_columns, - const std::vector<std::string> &field_columns, Filter *time_filter, - ResultSet *&ret_qds) { - common::PageArena pa; - pa.init(512, common::MOD_TSFILE_READER); - int ret = common::E_OK; - TsFileMeta *file_meta = tsfile_io_reader_->get_tsfile_meta(); - std::vector<MetaIndexNode *> table_inodes; - for (auto const &device : devices) { - MetaIndexNode *table_inode; - if (RET_FAIL(file_meta->get_table_metaindex_node( + + int TableQueryExecutor::query_on_tree( + const std::vector<std::shared_ptr<IDeviceID> > &devices, + const std::vector<std::string> &tag_columns, + const std::vector<std::string> &field_columns, Filter *time_filter, + ResultSet *&ret_qds) { + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + int ret = common::E_OK; + TsFileMeta *file_meta = tsfile_io_reader_->get_tsfile_meta(); + std::vector<MetaIndexNode *> table_inodes; + for (auto const &device: devices) { + MetaIndexNode *table_inode; + if (RET_FAIL(file_meta->get_table_metaindex_node( device->get_table_name(), table_inode))) { - }; - table_inodes.push_back(table_inode); - } + }; + table_inodes.push_back(table_inode); + } - std::vector<common::ColumnSchema> col_schema; - for (auto const &tag : tag_columns) { - col_schema.emplace_back(tag, common::TSDataType::STRING, common::ColumnCategory::TAG); - } + std::vector<common::ColumnSchema> col_schema; + for (auto const &tag: tag_columns) { + col_schema.emplace_back(tag, common::TSDataType::STRING, common::ColumnCategory::TAG); + } + + std::unordered_map<std::string, common::TSDataType> column_types_map; - std::unordered_map<std::string, common::TSDataType> column_types_map; - - for (auto const &device : devices) { - bool all_collected = true; - for (const auto& field_col : field_columns) { - if (column_types_map.find(field_col) == column_types_map.end()) { - all_collected = false; + for (auto const &device: devices) { + bool all_collected = true; + for (const auto &field_col: field_columns) { + if (column_types_map.find(field_col) == column_types_map.end()) { + all_collected = false; + break; + } + } + if (all_collected) { break; } - } - if (all_collected) { - break; - } - - std::unordered_set<std::string> measurements(field_columns.begin(), field_columns.end()); - std::vector<ITimeseriesIndex*> index(measurements.size()); - if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(device, measurements, index, pa))) { - continue; - } - for (auto* ts_index : index) { - if (ts_index != nullptr) { - std::string measurement_name = ts_index->get_measurement_name().to_std_string(); - if (column_types_map.find(measurement_name) == column_types_map.end()) { - common::TSDataType type = ts_index->get_data_type(); - if (type == common::TSDataType::INT32 || - type == common::TSDataType::INT64 || - type == common::TSDataType::TIMESTAMP || - type == common::TSDataType::DATE) { - type = common::TSDataType::INT64; - } else if (type == common::TSDataType::FLOAT) { - type = common::TSDataType::DOUBLE; + std::unordered_set<std::string> measurements(field_columns.begin(), field_columns.end()); + std::vector<ITimeseriesIndex *> index(measurements.size()); + if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(device, measurements, index, pa))) { + continue; + } + + for (auto *ts_index: index) { + if (ts_index != nullptr) { + std::string measurement_name = ts_index->get_measurement_name().to_std_string(); + if (column_types_map.find(measurement_name) == column_types_map.end()) { + common::TSDataType type = ts_index->get_data_type(); + if (type == common::TSDataType::INT32 || + type == common::TSDataType::INT64 || + type == common::TSDataType::TIMESTAMP || + type == common::TSDataType::DATE) { + type = common::TSDataType::INT64; + } else if (type == common::TSDataType::FLOAT) { + type = common::TSDataType::DOUBLE; + } + column_types_map[measurement_name] = type; } - column_types_map[measurement_name] = type; } } } - } - for (const auto& field_col : field_columns) { - if (column_types_map.find(field_col) != column_types_map.end()){ - col_schema.emplace_back(field_col, column_types_map[field_col], common::ColumnCategory::FIELD); - } else { - col_schema.emplace_back(field_col, common::TSDataType::INVALID_DATATYPE, common::ColumnCategory::FIELD); + for (const auto &field_col: field_columns) { + if (column_types_map.find(field_col) != column_types_map.end()) { + col_schema.emplace_back(field_col, column_types_map[field_col], common::ColumnCategory::FIELD); + } else { + col_schema.emplace_back(field_col, common::TSDataType::INVALID_DATATYPE, common::ColumnCategory::FIELD); + } } - } - TableSchema schema = TableSchema("default", col_schema); - - - -} - -void TableQueryExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } + auto schema = std::make_shared<TableSchema>("default", col_schema); + std::shared_ptr<ColumnMapping> column_mapping = std::make_shared<ColumnMapping>(); + for (size_t i = 0; i < col_schema.size(); ++i) { + column_mapping->add(col_schema[i].column_name_, i, *schema); + } + std::vector<common::TSDataType> datatypes = schema->get_data_types(); + auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>( + new DeviceTaskIterator(schema->get_measurement_names(), table_inodes, column_mapping, + meta_data_querier_, nullptr, schema + )); + std::unique_ptr<TsBlockReader> tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr<DeviceOrderedTsBlockReader>( + new DeviceOrderedTsBlockReader( + std::move(device_task_iterator), meta_data_querier_, + block_size_, tsfile_io_reader_, time_filter, nullptr)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = + new TableResultSet(std::move(tsblock_reader), schema->get_measurement_names(), + schema->get_data_types()); + return ret; + } -} // end namespace storage + void TableQueryExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } +} // end namespace storage diff --git a/cpp/src/reader/task/device_task_iterator.h b/cpp/src/reader/task/device_task_iterator.h index a5079877..dc0349a5 100644 --- a/cpp/src/reader/task/device_task_iterator.h +++ b/cpp/src/reader/task/device_task_iterator.h @@ -24,39 +24,51 @@ #include "reader/task/device_query_task.h" namespace storage { + class ColumnMapping; + class DeviceQueryTask; -class ColumnMapping; -class DeviceQueryTask; - -class DeviceTaskIterator { - public: - explicit DeviceTaskIterator(std::vector<std::string> column_names, - MetaIndexNode *index_root, - std::shared_ptr<ColumnMapping> column_mapping, - IMetadataQuerier *metadata_querier, - const Filter *id_filter, - std::shared_ptr<TableSchema> table_schema) - : column_names_(column_names), - column_mapping_(column_mapping), - device_meta_iterator_( - metadata_querier->device_iterator(index_root, id_filter)), - table_schema_(table_schema) { - pa_.init(512, common::MOD_DEVICE_TASK_ITER); - } - ~DeviceTaskIterator() { pa_.destroy(); } - - bool has_next() const; - - int next(DeviceQueryTask *&task); - - private: - std::vector<std::string> column_names_; - std::shared_ptr<ColumnMapping> column_mapping_; - std::unique_ptr<DeviceMetaIterator> device_meta_iterator_; - std::shared_ptr<TableSchema> table_schema_; - common::PageArena pa_; -}; - -} // namespace storage - -#endif // READER_TASK_DEVICE_TASK_ITERATOR_H \ No newline at end of file + class DeviceTaskIterator { + public: + explicit DeviceTaskIterator(std::vector<std::string> column_names, + MetaIndexNode *index_root, + std::shared_ptr<ColumnMapping> column_mapping, + IMetadataQuerier *metadata_querier, + const Filter *id_filter, + std::shared_ptr<TableSchema> table_schema) + : column_names_(column_names), + column_mapping_(column_mapping), + device_meta_iterator_( + metadata_querier->device_iterator(index_root, id_filter)), + table_schema_(table_schema) { + pa_.init(512, common::MOD_DEVICE_TASK_ITER); + } + + DeviceTaskIterator(std::vector<std::string> column_names, + std::vector<MetaIndexNode *> index_roots, + std::shared_ptr<ColumnMapping> column_mapping, + IMetadataQuerier *metadata_querier, + const Filter *id_filter, + std::shared_ptr<TableSchema> table_schema) + : column_names_(column_names), + column_mapping_(column_mapping), + device_meta_iterator_(metadata_querier->device_iterator(index_roots, id_filter)), + table_schema_(table_schema) { + pa_.init(512, common::MOD_DEVICE_TASK_ITER); + } + + ~DeviceTaskIterator() { pa_.destroy(); } + + bool has_next() const; + + int next(DeviceQueryTask *&task); + + private: + std::vector<std::string> column_names_; + std::shared_ptr<ColumnMapping> column_mapping_; + std::unique_ptr<DeviceMetaIterator> device_meta_iterator_; + std::shared_ptr<TableSchema> table_schema_; + common::PageArena pa_; + }; +} // namespace storage + +#endif // READER_TASK_DEVICE_TASK_ITERATOR_H
