This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit f329f4b44b066b6f4b654194f767707d20acb19d Author: ColinLee <[email protected]> AuthorDate: Sat Nov 29 22:17:36 2025 +0800 fix some bugs. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 29 +++++ cpp/src/file/tsfile_io_reader.cc | 6 +- cpp/src/reader/table_query_executor.cc | 8 +- cpp/src/reader/tsfile_reader.cc | 61 +++++++++-- .../reader/tree_view/tsfile_reader_tree_test.cc | 80 +++++++++++++- python/tsfile/tsfile_py_cpp.pxd | 1 + python/tsfile/tsfile_reader.pyx | 6 ++ python/tsfile/utils.py | 118 ++++++++++++--------- 8 files changed, 239 insertions(+), 70 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index ebe8107a..0b7585b9 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -546,6 +546,35 @@ TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader, return ret; } +// TimeseriesSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader +// reader, +// uint32_t* size) { +// auto* r = static_cast<storage::TsFileReader*>(reader); +// auto table_schemas = r->get_all_devices(); +// size_t table_num = table_schemas.size(); +// TableSchema* ret = +// static_cast<TableSchema*>(malloc(sizeof(TableSchema) * table_num)); +// for (size_t i = 0; i < table_schemas.size(); i++) { +// ret[i].table_name = +// strdup(table_schemas[i]->get_table_name().c_str()); int column_num = +// table_schemas[i]->get_columns_num(); ret[i].column_num = column_num; +// ret[i].column_schemas = static_cast<ColumnSchema*>( +// malloc(column_num * sizeof(ColumnSchema))); +// auto column_schemas = table_schemas[i]->get_measurement_schemas(); +// for (int j = 0; j < column_num; j++) { +// ret[i].column_schemas[j].column_name = +// strdup(column_schemas[j]->measurement_name_.c_str()); +// ret[i].column_schemas[j].data_type = +// static_cast<TSDataType>(column_schemas[j]->data_type_); +// ret[i].column_schemas[j].column_category = +// static_cast<ColumnCategory>( +// table_schemas[i]->get_column_categories()[j]); +// } +// } +// *size = table_num; +// return ret; +// } + // delete pointer void _free_tsfile_ts_record(TsRecord* record) { if (*record != nullptr) { diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 2801d6f8..e16b6b4a 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -460,10 +460,12 @@ int TsFileIOReader::get_timeseries_indexes( if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node, measurement_index_entry, measurement_ie_end_offset))) { - } else if (RET_FAIL(do_load_timeseries_index( + } else if (do_load_timeseries_index( measurement_name, measurement_index_entry->get_offset(), measurement_ie_end_offset, pa, timeseries_indexs[idx], - is_aligned))) { + is_aligned) == E_NOT_EXIST) { + idx++; + continue; } if (is_aligned) { AlignedTimeseriesIndex* aligned_timeseries_index = diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 3748b3db..2b6c5317 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -96,13 +96,13 @@ int TableQueryExecutor::query_on_tree( pa.init(512, common::MOD_TSFILE_READER); int ret = common::E_OK; TsFileMeta* file_meta = tsfile_io_reader_->get_tsfile_meta(); - std::vector<MetaIndexNode*> table_inodes; + std::unordered_set<MetaIndexNode*> table_inodes; for (auto const& device : devices) { MetaIndexNode* table_inode; if (RET_FAIL(file_meta->get_table_metaindex_node( device->get_table_name(), table_inode))) { }; - table_inodes.push_back(table_inode); + table_inodes.insert(table_inode); } std::vector<common::ColumnSchema> col_schema; @@ -174,9 +174,11 @@ int TableQueryExecutor::query_on_tree( column_mapping->add(col_schema[i].column_name_, i, *schema); } std::vector<common::TSDataType> datatypes = schema->get_data_types(); + std::vector<MetaIndexNode*> index_nodes(table_inodes.begin(), + table_inodes.end()); auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>(new DeviceTaskIterator( - schema->get_measurement_names(), table_inodes, column_mapping, + schema->get_measurement_names(), index_nodes, column_mapping, meta_data_querier_, nullptr, schema)); std::unique_ptr<TsBlockReader> tsblock_reader; switch (table_query_ordering_) { diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index d4646856..d16e3630 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -116,29 +116,68 @@ int TsFileReader::query_table_on_tree( } auto device_names = this->get_all_device_ids(); std::vector<std::shared_ptr<IDeviceID>> device_ids; + std::unordered_set<std::string> measurement_names_set_to_query; size_t max_len = 0; - for (auto& device_name : device_names) { - std::vector<MeasurementSchema> schemas; - this->get_timeseries_schema(device_name, schemas); - for (auto schema : schemas) { - if (std::find(measurement_names.begin(), measurement_names.end(), - schema.measurement_name_) != - measurement_names.end()) { + + if (measurement_names.empty()) { + for (auto& device_name : device_names) { + std::vector<MeasurementSchema> schemas; + this->get_timeseries_schema(device_name, schemas); + device_ids.push_back(device_name); + for (auto& schema : schemas) { + measurement_names_set_to_query.insert(schema.measurement_name_); + } + if (device_name->get_segments().size() > max_len) { + max_len = device_name->get_segments().size(); + } + } + } else { + std::unordered_set<std::string> found_measurement_names; + std::unordered_set<std::string> required_measurement_names( + measurement_names.begin(), measurement_names.end()); + for (auto& device_name : device_names) { + std::vector<MeasurementSchema> schemas; + this->get_timeseries_schema(device_name, schemas); + + bool device_has_required_measurement_names = false; + for (auto& schema : schemas) { + if (required_measurement_names.find(schema.measurement_name_) != + required_measurement_names.end()) { + found_measurement_names.insert(schema.measurement_name_); + device_has_required_measurement_names = true; + } + } + if (device_has_required_measurement_names) { device_ids.push_back(device_name); if (device_name->get_segments().size() > max_len) { max_len = device_name->get_segments().size(); } - break; } } + + if (found_measurement_names.size() < + required_measurement_names.size()) { + return E_COLUMN_NOT_EXIST; + } + measurement_names_set_to_query = found_measurement_names; + } + std::vector<std::string> measurement_names_to_query; + // Get all columns. + if (measurement_names.empty() && !measurement_names_set_to_query.empty()) { + for (auto& measurement_name : measurement_names_set_to_query) { + measurement_names_to_query.push_back(measurement_name); + } + } else { + measurement_names_to_query = measurement_names; } std::vector<std::string> columns_names(max_len); for (int i = 0; i < max_len; i++) { - columns_names[i] = "l_" + std::to_string(i); + columns_names[i] = "col_" + std::to_string(i); } Filter* time_filter = new TimeBetween(star_time, end_time, false); - ret = table_query_executor_->query_on_tree( - device_ids, columns_names, measurement_names, time_filter, result_set); + ret = table_query_executor_->query_on_tree(device_ids, columns_names, + measurement_names_to_query, + time_filter, result_set); return ret; } diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index e534a131..7f4d53dd 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -186,7 +186,6 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTable) { break; } } - std::cout << std::endl; cnt++; } ASSERT_EQ(cnt, 10); @@ -194,6 +193,85 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTable) { reader.close(); } +TEST_F(TsFileTreeReaderTest, ReadTreeByTableIrrergular) { + TsFileTreeWriter writer(&write_file_); + std::vector<std::string> device_ids = {"root.db1.t1", + "root.db2.t1", + "root.db3.t2.t3", + "root.db3.t3", + "device", + "device.ln", + "device2.ln1.tmp", + "device3.ln2.tmp.v1.v2", + "device3.ln2.tmp.v1.v3"}; + std::vector<std::string> measurement_ids1 = {"temperature", "hudi", + "level"}; + std::vector<std::string> measurement_ids2 = {"level", "vol"}; + for (int i = 0; i < device_ids.size(); ++i) { + std::string device_id = device_ids[i]; + TsRecord record(device_id, 0); + TsRecord record1(device_id, 1); + std::vector<std::string> measurements = + (i % 2 == 0) ? measurement_ids1 : measurement_ids2; + for (auto const& measurement : measurements) { + auto schema = + new storage::MeasurementSchema(measurement, TSDataType::INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, schema)); + delete schema; + record.add_point(measurement, static_cast<int64_t>(1)); + record1.add_point(measurement, static_cast<int64_t>(2)); + } + ASSERT_EQ(E_OK, writer.write(record)); + ASSERT_EQ(E_OK, writer.write(record1)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + reader.open(file_name_); + ResultSet* result; + int ret = reader.query_table_on_tree({"level", "hudi"}, INT64_MIN, + INT64_MAX, result); + ASSERT_EQ(ret, E_OK); + + auto* table_result_set = (storage::TableResultSet*)result; + bool has_next = false; + int num = table_result_set->get_metadata()->get_column_count(); + ASSERT_EQ(num, 6); + int cnt = 0; + int null_count = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + auto t = table_result_set->get_value<int64_t>(1); + ASSERT_TRUE(t == 0 || t == 1); + std::string key = ""; + std::string value = ""; + for (int i = 1; i < num + 1; ++i) { + if (table_result_set->is_null(i)) { + null_count++; + continue; + } + switch (table_result_set->get_metadata()->get_column_type(i)) { + case INT64: + ASSERT_TRUE(table_result_set->get_value<int64_t>(i) == 1 || + table_result_set->get_value<int64_t>(i) == 0); + break; + case INT32: + ASSERT_TRUE(table_result_set->get_value<int32_t>(i) == 1 || + table_result_set->get_value<int32_t>(i) == 2); + break; + default: + break; + } + } + cnt++; + std::cout << std::endl; + } + ASSERT_EQ(null_count, 24); + ASSERT_EQ(cnt, 18); + reader.destroy_query_data_set(result); + reader.close(); +} + TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { TsFileTreeWriter writer(&write_file_); std::vector<std::string> device_ids = {"device_1", "device_2", "device_3"}; diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 6ed03838..584ff0b1 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -55,5 +55,6 @@ cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, objec int64_t end_time) cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) +# cdef public api object get_all_timeseries_schema(TsFileReader reader) cpdef public api object get_tsfile_config() cpdef public api void set_tsfile_config(dict new_config) \ No newline at end of file diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 7e29213b..f8ed6adc 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -339,6 +339,12 @@ cdef class TsFileReaderPy: """ return get_all_table_schema(self.reader) + # def get_all_timeseries_schemas(self): + # """ + # Get all timeseries schemas + # """ + # return get_all_timeseries_schema(self.reader) + def close(self): """ Close TsFile Reader, if reader has result sets, invalid them. diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index f1fd51e7..e2baed5f 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -15,68 +15,80 @@ # specific language governing permissions and limitations # under the License. # - +import numpy as np import pandas as pd +from typing import Iterator, Union +from tsfile.tsfile_reader import TsFileReaderPy from tsfile.exceptions import TableNotExistError, ColumnNotExistError -from tsfile.tsfile_reader import TsFileReaderPy def to_dataframe(file_path: str, - table_name: str = None, - column_names: list[str] = None, - max_row_num: int = None) -> pd.DataFrame: - with TsFileReaderPy(file_path) as reader: - total_rows = 0 - table_schema = reader.get_all_table_schemas() - - # 判断是树模型还是表模型 - is_tree_model = len(table_schema) == 0 - - if is_tree_model: - # 树模型需要明确指定列名 - if column_names is None: - raise ValueError("树模型需要明确指定 column_names 参数") - else: - # 表模型的处理逻辑 - if table_name is None: - # get the first table name by default - table_name, columns = next(iter(table_schema.items())) + table_name: str | None = None, + column_names: list[str] | None = None, + start_time: int | None = None, + end_time: int | None = None, + max_row_num: int | None = None, + as_iterator: bool = False) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: + + def _gen() -> Iterator[pd.DataFrame]: + _table_name = table_name + _column_names = column_names + _start_time = start_time if start_time is not None else np.iinfo(np.int64).min + _end_time = end_time if end_time is not None else np.iinfo(np.int64).max + + with TsFileReaderPy(file_path) as reader: + total_rows = 0 + table_schema = reader.get_all_table_schemas() + + is_tree_model = len(table_schema) == 0 + + if is_tree_model: + if _column_names is None: + print("columns name is None, return all columns") else: - if table_name not in table_schema: - raise TableNotExistError(table_name) - columns = table_schema[table_name] + if _table_name is None: + _table_name, columns = next(iter(table_schema.items())) + else: + if _table_name not in table_schema: + raise TableNotExistError(_table_name) + columns = table_schema[_table_name] + + column_names_in_file = columns.get_column_names() - column_names_in_file = columns.get_column_names() + if _column_names is not None: + for column in _column_names: + if column not in column_names_in_file: + raise ColumnNotExistError(column) + else: + _column_names = column_names_in_file - if column_names is not None: - for column in column_names: - if column not in column_names_in_file: - raise ColumnNotExistError(column) + if is_tree_model: + if _column_names is None: + _column_names = [] + query_result = reader.query_table_on_tree(_column_names, _start_time, _end_time) else: - column_names = column_names_in_file + query_result = reader.query_table(_table_name, _column_names, _start_time, _end_time) - # 统一处理查询结果 - df_list: list[pd.DataFrame] = [] - - if is_tree_model: - query_result = reader.query_table_on_tree(column_names) - else: - query_result = reader.query_table(table_name, column_names) - - with query_result as result: - while result.next(): - if max_row_num is not None: - remaining_rows = max_row_num - total_rows - if remaining_rows <= 0: - break + with query_result as result: + while result.next(): + if max_row_num is not None: + remaining_rows = max_row_num - total_rows + if remaining_rows <= 0: + break + else: + batch_rows = min(remaining_rows, 1024) + df = result.read_data_frame(batch_rows) + total_rows += len(df) else: - batch_rows = min(remaining_rows, 1024) - df = result.read_data_frame(batch_rows) - total_rows += len(df) - else: - df = result.read_data_frame() - df_list.append(df) - - df = pd.concat(df_list, ignore_index=True) - return df + df = result.read_data_frame() + yield df + + if as_iterator: + return _gen() + else: + df_list = list(_gen()) + if df_list: + return pd.concat(df_list, ignore_index=True) + else: + return pd.DataFrame() \ No newline at end of file
