This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch colin_support_read_tree
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit f329f4b44b066b6f4b654194f767707d20acb19d
Author: ColinLee <[email protected]>
AuthorDate: Sat Nov 29 22:17:36 2025 +0800

    fix some bugs.
---
 cpp/src/cwrapper/tsfile_cwrapper.cc                |  29 +++++
 cpp/src/file/tsfile_io_reader.cc                   |   6 +-
 cpp/src/reader/table_query_executor.cc             |   8 +-
 cpp/src/reader/tsfile_reader.cc                    |  61 +++++++++--
 .../reader/tree_view/tsfile_reader_tree_test.cc    |  80 +++++++++++++-
 python/tsfile/tsfile_py_cpp.pxd                    |   1 +
 python/tsfile/tsfile_reader.pyx                    |   6 ++
 python/tsfile/utils.py                             | 118 ++++++++++++---------
 8 files changed, 239 insertions(+), 70 deletions(-)

diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc 
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index ebe8107a..0b7585b9 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -546,6 +546,35 @@ TableSchema* 
tsfile_reader_get_all_table_schemas(TsFileReader reader,
     return ret;
 }
 
+// TimeseriesSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader
+// reader,
+//                                                  uint32_t* size) {
+//     auto* r = static_cast<storage::TsFileReader*>(reader);
+//     auto table_schemas = r->get_all_devices();
+//     size_t table_num = table_schemas.size();
+//     TableSchema* ret =
+//         static_cast<TableSchema*>(malloc(sizeof(TableSchema) * table_num));
+//     for (size_t i = 0; i < table_schemas.size(); i++) {
+//         ret[i].table_name =
+//         strdup(table_schemas[i]->get_table_name().c_str()); int column_num =
+//         table_schemas[i]->get_columns_num(); ret[i].column_num = column_num;
+//         ret[i].column_schemas = static_cast<ColumnSchema*>(
+//             malloc(column_num * sizeof(ColumnSchema)));
+//         auto column_schemas = table_schemas[i]->get_measurement_schemas();
+//         for (int j = 0; j < column_num; j++) {
+//             ret[i].column_schemas[j].column_name =
+//                 strdup(column_schemas[j]->measurement_name_.c_str());
+//             ret[i].column_schemas[j].data_type =
+//                 static_cast<TSDataType>(column_schemas[j]->data_type_);
+//             ret[i].column_schemas[j].column_category =
+//                 static_cast<ColumnCategory>(
+//                     table_schemas[i]->get_column_categories()[j]);
+//         }
+//     }
+//     *size = table_num;
+//     return ret;
+// }
+
 // delete pointer
 void _free_tsfile_ts_record(TsRecord* record) {
     if (*record != nullptr) {
diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc
index 2801d6f8..e16b6b4a 100644
--- a/cpp/src/file/tsfile_io_reader.cc
+++ b/cpp/src/file/tsfile_io_reader.cc
@@ -460,10 +460,12 @@ int TsFileIOReader::get_timeseries_indexes(
         if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node,
                                                   measurement_index_entry,
                                                   measurement_ie_end_offset))) 
{
-        } else if (RET_FAIL(do_load_timeseries_index(
+        } else if (do_load_timeseries_index(
                        measurement_name, measurement_index_entry->get_offset(),
                        measurement_ie_end_offset, pa, timeseries_indexs[idx],
-                       is_aligned))) {
+                       is_aligned) == E_NOT_EXIST) {
+            idx++;
+            continue;
         }
         if (is_aligned) {
             AlignedTimeseriesIndex* aligned_timeseries_index =
diff --git a/cpp/src/reader/table_query_executor.cc 
b/cpp/src/reader/table_query_executor.cc
index 3748b3db..2b6c5317 100644
--- a/cpp/src/reader/table_query_executor.cc
+++ b/cpp/src/reader/table_query_executor.cc
@@ -96,13 +96,13 @@ int TableQueryExecutor::query_on_tree(
     pa.init(512, common::MOD_TSFILE_READER);
     int ret = common::E_OK;
     TsFileMeta* file_meta = tsfile_io_reader_->get_tsfile_meta();
-    std::vector<MetaIndexNode*> table_inodes;
+    std::unordered_set<MetaIndexNode*> table_inodes;
     for (auto const& device : devices) {
         MetaIndexNode* table_inode;
         if (RET_FAIL(file_meta->get_table_metaindex_node(
                 device->get_table_name(), table_inode))) {
         };
-        table_inodes.push_back(table_inode);
+        table_inodes.insert(table_inode);
     }
 
     std::vector<common::ColumnSchema> col_schema;
@@ -174,9 +174,11 @@ int TableQueryExecutor::query_on_tree(
         column_mapping->add(col_schema[i].column_name_, i, *schema);
     }
     std::vector<common::TSDataType> datatypes = schema->get_data_types();
+    std::vector<MetaIndexNode*> index_nodes(table_inodes.begin(),
+                                            table_inodes.end());
     auto device_task_iterator =
         std::unique_ptr<DeviceTaskIterator>(new DeviceTaskIterator(
-            schema->get_measurement_names(), table_inodes, column_mapping,
+            schema->get_measurement_names(), index_nodes, column_mapping,
             meta_data_querier_, nullptr, schema));
     std::unique_ptr<TsBlockReader> tsblock_reader;
     switch (table_query_ordering_) {
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index d4646856..d16e3630 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -116,29 +116,68 @@ int TsFileReader::query_table_on_tree(
     }
     auto device_names = this->get_all_device_ids();
     std::vector<std::shared_ptr<IDeviceID>> device_ids;
+    std::unordered_set<std::string> measurement_names_set_to_query;
     size_t max_len = 0;
-    for (auto& device_name : device_names) {
-        std::vector<MeasurementSchema> schemas;
-        this->get_timeseries_schema(device_name, schemas);
-        for (auto schema : schemas) {
-            if (std::find(measurement_names.begin(), measurement_names.end(),
-                          schema.measurement_name_) !=
-                measurement_names.end()) {
+
+    if (measurement_names.empty()) {
+        for (auto& device_name : device_names) {
+            std::vector<MeasurementSchema> schemas;
+            this->get_timeseries_schema(device_name, schemas);
+            device_ids.push_back(device_name);
+            for (auto& schema : schemas) {
+                
measurement_names_set_to_query.insert(schema.measurement_name_);
+            }
+            if (device_name->get_segments().size() > max_len) {
+                max_len = device_name->get_segments().size();
+            }
+        }
+    } else {
+        std::unordered_set<std::string> found_measurement_names;
+        std::unordered_set<std::string> required_measurement_names(
+            measurement_names.begin(), measurement_names.end());
+        for (auto& device_name : device_names) {
+            std::vector<MeasurementSchema> schemas;
+            this->get_timeseries_schema(device_name, schemas);
+
+            bool device_has_required_measurement_names = false;
+            for (auto& schema : schemas) {
+                if (required_measurement_names.find(schema.measurement_name_) 
!=
+                    required_measurement_names.end()) {
+                    found_measurement_names.insert(schema.measurement_name_);
+                    device_has_required_measurement_names = true;
+                }
+            }
+            if (device_has_required_measurement_names) {
                 device_ids.push_back(device_name);
                 if (device_name->get_segments().size() > max_len) {
                     max_len = device_name->get_segments().size();
                 }
-                break;
             }
         }
+
+        if (found_measurement_names.size() <
+            required_measurement_names.size()) {
+            return E_COLUMN_NOT_EXIST;
+        }
+        measurement_names_set_to_query = found_measurement_names;
+    }
+    std::vector<std::string> measurement_names_to_query;
+    // Get all columns.
+    if (measurement_names.empty() && !measurement_names_set_to_query.empty()) {
+        for (auto& measurement_name : measurement_names_set_to_query) {
+            measurement_names_to_query.push_back(measurement_name);
+        }
+    } else {
+        measurement_names_to_query = measurement_names;
     }
     std::vector<std::string> columns_names(max_len);
     for (int i = 0; i < max_len; i++) {
-        columns_names[i] = "l_" + std::to_string(i);
+        columns_names[i] = "col_" + std::to_string(i);
     }
     Filter* time_filter = new TimeBetween(star_time, end_time, false);
-    ret = table_query_executor_->query_on_tree(
-        device_ids, columns_names, measurement_names, time_filter, result_set);
+    ret = table_query_executor_->query_on_tree(device_ids, columns_names,
+                                               measurement_names_to_query,
+                                               time_filter, result_set);
     return ret;
 }
 
diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc 
b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
index e534a131..7f4d53dd 100644
--- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
@@ -186,7 +186,6 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTable) {
                     break;
             }
         }
-        std::cout << std::endl;
         cnt++;
     }
     ASSERT_EQ(cnt, 10);
@@ -194,6 +193,85 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTable) {
     reader.close();
 }
 
+TEST_F(TsFileTreeReaderTest, ReadTreeByTableIrrergular) {
+    TsFileTreeWriter writer(&write_file_);
+    std::vector<std::string> device_ids = {"root.db1.t1",
+                                           "root.db2.t1",
+                                           "root.db3.t2.t3",
+                                           "root.db3.t3",
+                                           "device",
+                                           "device.ln",
+                                           "device2.ln1.tmp",
+                                           "device3.ln2.tmp.v1.v2",
+                                           "device3.ln2.tmp.v1.v3"};
+    std::vector<std::string> measurement_ids1 = {"temperature", "hudi",
+                                                 "level"};
+    std::vector<std::string> measurement_ids2 = {"level", "vol"};
+    for (int i = 0; i < device_ids.size(); ++i) {
+        std::string device_id = device_ids[i];
+        TsRecord record(device_id, 0);
+        TsRecord record1(device_id, 1);
+        std::vector<std::string> measurements =
+            (i % 2 == 0) ? measurement_ids1 : measurement_ids2;
+        for (auto const& measurement : measurements) {
+            auto schema =
+                new storage::MeasurementSchema(measurement, TSDataType::INT32);
+            ASSERT_EQ(E_OK, writer.register_timeseries(device_id, schema));
+            delete schema;
+            record.add_point(measurement, static_cast<int64_t>(1));
+            record1.add_point(measurement, static_cast<int64_t>(2));
+        }
+        ASSERT_EQ(E_OK, writer.write(record));
+        ASSERT_EQ(E_OK, writer.write(record1));
+    }
+    writer.flush();
+    writer.close();
+
+    TsFileReader reader;
+    reader.open(file_name_);
+    ResultSet* result;
+    int ret = reader.query_table_on_tree({"level", "hudi"}, INT64_MIN,
+                                         INT64_MAX, result);
+    ASSERT_EQ(ret, E_OK);
+
+    auto* table_result_set = (storage::TableResultSet*)result;
+    bool has_next = false;
+    int num = table_result_set->get_metadata()->get_column_count();
+    ASSERT_EQ(num, 6);
+    int cnt = 0;
+    int null_count = 0;
+    while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+        auto t = table_result_set->get_value<int64_t>(1);
+        ASSERT_TRUE(t == 0 || t == 1);
+        std::string key = "";
+        std::string value = "";
+        for (int i = 1; i < num + 1; ++i) {
+            if (table_result_set->is_null(i)) {
+                null_count++;
+                continue;
+            }
+            switch (table_result_set->get_metadata()->get_column_type(i)) {
+                case INT64:
+                    ASSERT_TRUE(table_result_set->get_value<int64_t>(i) == 1 ||
+                                table_result_set->get_value<int64_t>(i) == 0);
+                    break;
+                case INT32:
+                    ASSERT_TRUE(table_result_set->get_value<int32_t>(i) == 1 ||
+                                table_result_set->get_value<int32_t>(i) == 2);
+                    break;
+                default:
+                    break;
+            }
+        }
+        cnt++;
+        std::cout << std::endl;
+    }
+    ASSERT_EQ(null_count, 24);
+    ASSERT_EQ(cnt, 18);
+    reader.destroy_query_data_set(result);
+    reader.close();
+}
+
 TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
     TsFileTreeWriter writer(&write_file_);
     std::vector<std::string> device_ids = {"device_1", "device_2", "device_3"};
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index 6ed03838..584ff0b1 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -55,5 +55,6 @@ cdef public api ResultSet 
tsfile_reader_query_paths_c(TsFileReader reader, objec
                                                       int64_t end_time)
 cdef public api object get_table_schema(TsFileReader reader, object table_name)
 cdef public api object get_all_table_schema(TsFileReader reader)
+# cdef public api object get_all_timeseries_schema(TsFileReader reader)
 cpdef public api object get_tsfile_config()
 cpdef public api void set_tsfile_config(dict new_config)
\ No newline at end of file
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index 7e29213b..f8ed6adc 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -339,6 +339,12 @@ cdef class TsFileReaderPy:
         """
         return get_all_table_schema(self.reader)
 
+    # def get_all_timeseries_schemas(self):
+    #     """
+    #     Get all timeseries schemas
+    #     """
+    #     return get_all_timeseries_schema(self.reader)
+
     def close(self):
         """
         Close TsFile Reader, if reader has result sets, invalid them.
diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py
index f1fd51e7..e2baed5f 100644
--- a/python/tsfile/utils.py
+++ b/python/tsfile/utils.py
@@ -15,68 +15,80 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
+import numpy as np
 import pandas as pd
+from typing import Iterator, Union
+from tsfile.tsfile_reader import TsFileReaderPy
 
 from tsfile.exceptions import TableNotExistError, ColumnNotExistError
-from tsfile.tsfile_reader import TsFileReaderPy
 
 
 def to_dataframe(file_path: str,
-                 table_name: str = None,
-                 column_names: list[str] = None,
-                 max_row_num: int = None) -> pd.DataFrame:
-    with TsFileReaderPy(file_path) as reader:
-        total_rows = 0
-        table_schema = reader.get_all_table_schemas()
-        
-        # 判断是树模型还是表模型
-        is_tree_model = len(table_schema) == 0
-        
-        if is_tree_model:
-            # 树模型需要明确指定列名
-            if column_names is None:
-                raise ValueError("树模型需要明确指定 column_names 参数")
-        else:
-            # 表模型的处理逻辑
-            if table_name is None:
-                # get the first table name by default
-                table_name, columns = next(iter(table_schema.items()))
+                 table_name: str | None = None,
+                 column_names: list[str] | None = None,
+                 start_time: int | None = None,
+                 end_time: int | None = None,
+                 max_row_num: int | None = None,
+                 as_iterator: bool = False) -> Union[pd.DataFrame, 
Iterator[pd.DataFrame]]:
+
+    def _gen() -> Iterator[pd.DataFrame]:
+        _table_name = table_name
+        _column_names = column_names
+        _start_time = start_time if start_time is not None else 
np.iinfo(np.int64).min
+        _end_time = end_time if end_time is not None else 
np.iinfo(np.int64).max
+
+        with TsFileReaderPy(file_path) as reader:
+            total_rows = 0
+            table_schema = reader.get_all_table_schemas()
+
+            is_tree_model = len(table_schema) == 0
+
+            if is_tree_model:
+                if _column_names is None:
+                    print("columns name is None, return all columns")
             else:
-                if table_name not in table_schema:
-                    raise TableNotExistError(table_name)
-                columns = table_schema[table_name]
+                if _table_name is None:
+                    _table_name, columns = next(iter(table_schema.items()))
+                else:
+                    if _table_name not in table_schema:
+                        raise TableNotExistError(_table_name)
+                    columns = table_schema[_table_name]
+
+                column_names_in_file = columns.get_column_names()
 
-            column_names_in_file = columns.get_column_names()
+                if _column_names is not None:
+                    for column in _column_names:
+                        if column not in column_names_in_file:
+                            raise ColumnNotExistError(column)
+                else:
+                    _column_names = column_names_in_file
 
-            if column_names is not None:
-                for column in column_names:
-                    if column not in column_names_in_file:
-                        raise ColumnNotExistError(column)
+            if is_tree_model:
+                if _column_names is None:
+                    _column_names = []
+                query_result = reader.query_table_on_tree(_column_names, 
_start_time, _end_time)
             else:
-                column_names = column_names_in_file
+                query_result = reader.query_table(_table_name, _column_names, 
_start_time, _end_time)
 
-        # 统一处理查询结果
-        df_list: list[pd.DataFrame] = []
-        
-        if is_tree_model:
-            query_result = reader.query_table_on_tree(column_names)
-        else:
-            query_result = reader.query_table(table_name, column_names)
-        
-        with query_result as result:
-            while result.next():
-                if max_row_num is not None:
-                    remaining_rows = max_row_num - total_rows
-                    if remaining_rows <= 0:
-                        break
+            with query_result as result:
+                while result.next():
+                    if max_row_num is not None:
+                        remaining_rows = max_row_num - total_rows
+                        if remaining_rows <= 0:
+                            break
+                        else:
+                            batch_rows = min(remaining_rows, 1024)
+                        df = result.read_data_frame(batch_rows)
+                        total_rows += len(df)
                     else:
-                        batch_rows = min(remaining_rows, 1024)
-                    df = result.read_data_frame(batch_rows)
-                    total_rows += len(df)
-                else:
-                    df = result.read_data_frame()
-                df_list.append(df)
-        
-        df = pd.concat(df_list, ignore_index=True)
-        return df
+                        df = result.read_data_frame()
+                    yield df
+
+    if as_iterator:
+        return _gen()
+    else:
+        df_list = list(_gen())
+        if df_list:
+            return pd.concat(df_list, ignore_index=True)
+        else:
+            return pd.DataFrame()
\ No newline at end of file

Reply via email to