This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 5c3fb4d1a30f73cf5ae556eed700369e1f1e7646 Author: Colin Lee <[email protected]> AuthorDate: Wed Jul 3 16:23:15 2024 +0800 Python wrapper. (#112) * Add TsFile Python. * Fix macos and windows build * fix import error in windows python over 3.8. * refine python code. --------- Co-authored-by: Haonan <[email protected]> --- .github/workflows/unit-test.yml | 13 +- .gitignore | 11 ++ cpp/src/cwrapper/TsFile-cwrapper.cc | 16 +- cpp/src/reader/tsfile_executor.cc | 4 + pom.xml | 37 ++++ python/README.md | 66 +++++++ python/examlpes.py | 82 +++++++++ python/pom.xml | 127 +++++++++++++ python/requirements.txt | 24 +++ python/setup.py | 125 +++++++++++++ python/test.py | 171 ++++++++++++++++++ python/tsfile/__init__.py | 18 ++ python/tsfile/tsfile.pxd | 104 +++++++++++ python/tsfile/tsfile.py | 145 +++++++++++++++ python/tsfile/tsfile_pywrapper.pyx | 348 ++++++++++++++++++++++++++++++++++++ 15 files changed, 1279 insertions(+), 12 deletions(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index afc521f8..a529989d 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -73,13 +73,16 @@ jobs: core.setOutput('platform_suffix', ``) } - - name: Install python packages - if: ${{ matrix.os == 'ubuntu-latest'}} - run: pip3 install cython pandas numpy - + # Use python 3.12 to avoid Cython files don't compile on Mingw-w64 64-bit + # https://bugs.python.org/issue40167 + - name: Set up python 3.12 for windows + if: ${{ matrix.os == 'windows-latest'}} + uses: actions/setup-python@v5 + with: + python-version: '3.12' # Run the actual maven build including all unit- and integration-tests. - name: Build and test with Maven (All others) shell: bash run: | - ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-java,with-cpp clean verify + ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-java,with-cpp,with-python clean verify diff --git a/.gitignore b/.gitignore index 77f4d73a..944ecf5a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,14 @@ docs/node_modules/ docs/src/.vuepress/.cache/ docs/src/.vuepress/.temp/ docs/src/.vuepress/dist/ + +# python files +python/build +python/tsfile/__pycache__ +python/tsfile/*so* +python/tsfile/*dll* +python/tsfile/*dylib* +python/tsfile/*.h +python/tsfile/*.cpp +python/data +python/venv/* diff --git a/cpp/src/cwrapper/TsFile-cwrapper.cc b/cpp/src/cwrapper/TsFile-cwrapper.cc index d2153a1d..df38ce72 100644 --- a/cpp/src/cwrapper/TsFile-cwrapper.cc +++ b/cpp/src/cwrapper/TsFile-cwrapper.cc @@ -206,10 +206,6 @@ ErrorCode tsfile_register_table_column(CTsFileWriter writer, get_datatype(schema->column_def), get_data_encoding(schema->column_def), get_data_compression(schema->column_def)); - std::cout << "register table column name" << table_name << std::endl; - std::cout << "register column name" << schema->name << std::endl; - std::cout << "register column type" << get_datatype(schema->column_def) - << std::endl; return ret; } @@ -644,7 +640,9 @@ QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, ret->data = qds; ret->column_num = column_num; ret->column_names = (char**)malloc(column_num * sizeof(char*)); - memcpy(ret->column_names, columns_name, column_num * sizeof(char*)); + for (int i = 0; i < column_num; i++) { + ret->column_names[i] = strdup(columns_name[i]); + } storage::QueryExpression::destory(query_expr); return ret; } @@ -666,7 +664,9 @@ QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, ret->data = qds; ret->column_names = (char**)malloc(column_num * sizeof(char*)); ret->column_num = column_num; - memcpy(ret->column_names, columns_name, column_num * sizeof(char*)); + for (int i = 0; i < column_num; i++) { + ret->column_names[i] = strdup(columns_name[i]); + } storage::QueryExpression::destory(query_expr); return ret; } @@ -674,6 +674,9 @@ QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, ErrorCode destory_query_dataret(QueryDataRet data) { storage::QueryDataSet* qds = (storage::QueryDataSet*)data->data; delete qds; + for (int i = 0; i < data->column_num; i++) { + free(data->column_names[i]); + } free(data->column_names); free(data); return E_OK; @@ -699,7 +702,6 @@ DataResult* ts_next(QueryDataRet data, int expect_line_count) { get_schema_info(field->type_)); } init_tablet = true; - std::cout << "init finished" << std::endl; } for (int col = 0; col < column_num; col++) { storage::Field* field = record->get_field(col); diff --git a/cpp/src/reader/tsfile_executor.cc b/cpp/src/reader/tsfile_executor.cc index e2be78af..757a8e23 100644 --- a/cpp/src/reader/tsfile_executor.cc +++ b/cpp/src/reader/tsfile_executor.cc @@ -74,10 +74,14 @@ int TsFileExecutor::execute(QueryExpression *query_expr, } if (regular_expr == nullptr || regular_expr->type_ == GLOBALTIME_EXPR) { +#if DEBUG_SE std::cout << "got into 1 path" << std::endl; +#endif return execute_may_with_global_timefilter(query_exprs_, ret_qds); } else { +#if DEBUG_SE std::cout << "got into 2 path" << std::endl; +#endif // no filter or just global time filter return execute_with_timegenerator(query_exprs_, ret_qds); } diff --git a/pom.xml b/pom.xml index 9c0e1432..4927e735 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ <exclude>**/.clang-format</exclude> <!-- generated by cython--> <exclude>**/tsfile/tsfile_pywrapper.cpp</exclude> + <exclude>**/venv/**</exclude> </excludes> </configuration> </plugin> @@ -191,6 +192,11 @@ <artifactId>build-helper-maven-plugin</artifactId> <version>3.5.0</version> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.6.0</version> + </plugin> <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> @@ -579,6 +585,13 @@ <module>cpp</module> </modules> </profile> + <!-- Build the python version of TsFile --> + <profile> + <id>with-python</id> + <modules> + <module>python</module> + </modules> + </profile> <profile> <id>.java-9-and-above</id> <activation> @@ -653,6 +666,9 @@ <os.suffix>linux</os.suffix> <os.classifier>linux-x86_64</os.classifier> <cmake.generator>Unix Makefiles</cmake.generator> + <python.venv.bin>venv/bin/</python.venv.bin> + <python.exe.bin>python3</python.exe.bin> + <python.compiler.argument> </python.compiler.argument> </properties> </profile> <!-- Profile for linux amd64 (mainly AMD Processors) (Self-Enabling) --> @@ -669,6 +685,9 @@ <os.suffix>linux</os.suffix> <os.classifier>linux-amd64</os.classifier> <cmake.generator>Unix Makefiles</cmake.generator> + <python.venv.bin>venv/bin/</python.venv.bin> + <python.exe.bin>python3</python.exe.bin> + <python.compiler.argument> </python.compiler.argument> </properties> </profile> <!-- Profile for linux aarch64 (mainly newer Mac or Raspberry PI Processors) (Self-Enabling) --> @@ -685,6 +704,9 @@ <os.suffix>linux</os.suffix> <os.classifier>linux-${os.arch}</os.classifier> <cmake.generator>Unix Makefiles</cmake.generator> + <python.venv.bin>venv/bin/</python.venv.bin> + <python.exe.bin>python3</python.exe.bin> + <python.compiler.argument> </python.compiler.argument> </properties> </profile> <!-- Profile for mac x86_64 (mainly Intel Processors) (Self-Enabling) --> @@ -700,6 +722,9 @@ <os.suffix>mac</os.suffix> <os.classifier>mac-x86_64</os.classifier> <cmake.generator>Unix Makefiles</cmake.generator> + <python.venv.bin>venv/bin/</python.venv.bin> + <python.exe.bin>python3</python.exe.bin> + <python.compiler.argument> </python.compiler.argument> </properties> </profile> <!-- Profile for mac aarch64 (mainly AMD Processors) (Self-Enabling) --> @@ -715,6 +740,9 @@ <os.suffix>mac</os.suffix> <os.classifier>mac-aarch64</os.classifier> <cmake.generator>Unix Makefiles</cmake.generator> + <python.venv.bin>venv/bin/</python.venv.bin> + <python.exe.bin>python3</python.exe.bin> + <python.compiler.argument> </python.compiler.argument> </properties> </profile> <!-- profile for windows x86_64 (mainly Intel Processors) (Self-Enabling) --> @@ -732,6 +760,9 @@ <cmake.generator>MinGW Makefiles</cmake.generator> <!-- The generated code relied on Boost and that relies on VS and can't be built with MinGW --> <!--cmake.generator>Visual Studio 17 2022</cmake.generator--> + <python.venv.bin>venv/Scripts/</python.venv.bin> + <python.exe.bin>python</python.exe.bin> + <python.compiler.argument>--compiler=mingw32</python.compiler.argument> </properties> </profile> <!-- profile for windows amd64 (mainly AMD Processors) (Self-Enabling) --> @@ -749,6 +780,9 @@ <cmake.generator>MinGW Makefiles</cmake.generator> <!-- The generated code relied on Boost and that relies on VS and can't be built with MinGW --> <!--cmake.generator>Visual Studio 17 2022</cmake.generator--> + <python.venv.bin>venv/Scripts/</python.venv.bin> + <python.exe.bin>python</python.exe.bin> + <python.compiler.argument>--compiler=mingw32</python.compiler.argument> </properties> </profile> <!-- profile for windows aarch64 (mainly newer Mac or Raspberry PI Processors) (Self-Enabling) --> @@ -766,6 +800,9 @@ <cmake.generator>MinGW Makefiles</cmake.generator> <!-- The generated code relied on Boost and that relies on VS and can't be built with MinGW --> <!--cmake.generator>Visual Studio 17 2022</cmake.generator--> + <python.venv.bin>venv/Scripts/</python.venv.bin> + <python.exe.bin>python</python.exe.bin> + <python.compiler.argument>--compiler=mingw32</python.compiler.argument> </properties> </profile> <!-- Little helper profile that will disable running the cmake tests when the maven tests are being skipped --> diff --git a/python/README.md b/python/README.md new file mode 100644 index 00000000..0e9ff438 --- /dev/null +++ b/python/README.md @@ -0,0 +1,66 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +# TsFile Python Document + +<pre> +___________ ___________.__.__ +\__ ___/____\_ _____/|__| | ____ + | | / ___/| __) | | | _/ __ \ + | | \___ \ | \ | | |_\ ___/ + |____|/____ >\___ / |__|____/\___ > version 1.0.0 + \/ \/ \/ +</pre> + + +## Introduction + +This directory contains the Python implementation of TsFile. The Python version is built on the CPP version and uses the Cython package to integrate TsFile's read and write capabilities into the Python environment. Users can read and write TsFile as easily as they use read_csv and write_csv in Pandas. + +The source code can be found in the `./tsfile` directory. Files ending with `.pyx` and `.pyd` are wrapper code written in Cython. The `tsfile/tsfile.py` defines some user interfaces. You can find some examples of reading and writing in the `.examples/examples.py`. + + +## How to make contributions + +Using pylint to check Python code is recommended. However, there is no suitable style checking tool for Cython code, and this part of the code should be consistent with the Python style required by pylint. + +**Feature List** +- [ ] In pywrapper, invoke the batch reading interface implemented in CPP version of TsFile. +- [ ] Supports writing multiple DataFrames into one single TsFile. + + + +## Build + +Before constructing Python version of TsFile, it is necessary to build [CPP version of TsFile](../cpp/README.md) first, because Python version of TsFile relies on the shared library files provided by CPP version of TsFile. + +Build by mvn in root directory: + +```sh +mvn -P with-cpp,with-python clean verify +``` + +Build by python command: + +```sh +python setup.py build_ext --inplace +``` + diff --git a/python/examlpes.py b/python/examlpes.py new file mode 100644 index 00000000..00ced4eb --- /dev/null +++ b/python/examlpes.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import numpy as np +import pandas as pd +import os + +import tsfile as ts + + +# test writing data +data_dir = os.path.join(os.path.dirname(__file__), "test.tsfile") +DEVICE_NAME = "test_table" + +# 1000 rows data +time = np.arange(1, 1001, dtype=np.int64) +level = np.linspace(2000, 3000, num=1000, dtype=np.float32) +num = np.arange(10000, 11000, dtype=np.int64) +df = pd.DataFrame({"Time": time, "level": level, "num": num}) + +if os.path.exists(data_dir): + os.remove(data_dir) +ts.write_tsfile(data_dir, DEVICE_NAME, df) + + +# read data we already wrote +# with 20 chunksize +tsfile_ret = ts.read_tsfile(data_dir, DEVICE_NAME, ["level", "num"], chunksize=20) +print(tsfile_ret.shape) + +# with 100 chunksize +tsfile_ret = ts.read_tsfile(data_dir, DEVICE_NAME, ["level", "num"], chunksize=100) +print(tsfile_ret.shape) + +# get all data +tsfile_ret = ts.read_tsfile(data_dir, DEVICE_NAME, ["level", "num"]) +print(tsfile_ret.shape) + +# with iterator +with ts.read_tsfile( + data_dir, DEVICE_NAME, ["level", "num"], iterator=True, chunksize=100 +) as reader: + for chunk in reader: + print(chunk.shape) + +# with time scale and chunksize +tsfile_ret = ts.read_tsfile( + data_dir, DEVICE_NAME, ["level"], start_time=50, end_time=100, chunksize=10 +) +print(tsfile_ret.shape) + +# with time scale +tsfile_ret = ts.read_tsfile(data_dir, DEVICE_NAME, ["num"], start_time=50, end_time=100) +print(tsfile_ret.shape) + +# with time scale, iterator and chunksize +with ts.read_tsfile( + data_dir, + DEVICE_NAME, + ["level", "num"], + iterator=True, + start_time=100, + end_time=500, + chunksize=100, +) as reader: + for chunk in reader: + print(chunk.shape) diff --git a/python/pom.xml b/python/pom.xml new file mode 100644 index 00000000..94856f7e --- /dev/null +++ b/python/pom.xml @@ -0,0 +1,127 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tsfile</groupId> + <artifactId>tsfile-parent</artifactId> + <version>1.0.1-SNAPSHOT</version> + </parent> + <artifactId>tsfile-python</artifactId> + <packaging>pom</packaging> + <name>TsFile: Python</name> + <properties> + <unity.version>2.6.0</unity.version> + <groovy.version>4.0.21</groovy.version> + <!-- Tell Sonar where to find the sources --> + <sonar.sources>tsfile</sonar.sources> + <sonar.cfamily.build-wrapper-output>${project.build.directory}/build-wrapper-output</sonar.cfamily.build-wrapper-output> + </properties> + <build> + <sourceDirectory>${project.basedir}</sourceDirectory> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <executions> + <!-- Create python virtual environment --> + <execution> + <id>python-venv</id> + <phase>initialize</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${python.exe.bin}</executable> + <arguments> + <argument>-m</argument> + <argument>venv</argument> + <argument>${project.basedir}/venv</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>python-upgrade-pip</id> + <phase>initialize</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${python.venv.bin}${python.exe.bin}</executable> + <arguments> + <argument>-m</argument> + <argument>pip</argument> + <argument>install</argument> + <argument>--upgrade</argument> + <argument>pip</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>python-install-requirements</id> + <phase>initialize</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${python.venv.bin}${python.exe.bin}</executable> + <arguments> + <argument>-m</argument> + <argument>pip</argument> + <argument>install</argument> + <argument>-r</argument> + <argument>${project.basedir}/requirements.txt</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>compile-python-code</id> + <phase>compile</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${python.venv.bin}${python.exe.bin}</executable> + <arguments> + <argument>setup.py</argument> + <argument>build_ext</argument> + <argument>--inplace</argument> + <argument>${python.compiler.argument}</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>run-python-tests</id> + <phase>test</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${python.venv.bin}${python.exe.bin}</executable> + <arguments> + <argument>${project.basedir}/test.py</argument> + </arguments> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 00000000..176150e9 --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cython==3.0.10 +numpy==1.26.4 +pandas==2.2.2 +setuptools==69.5.1 + diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 00000000..3d57e55f --- /dev/null +++ b/python/setup.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from setuptools import setup, Extension +from setuptools.command.build_ext import build_ext +from Cython.Build import cythonize +import numpy as np +import platform +import shutil +import os + + +def copy_lib_files(system, source_dir, target_dir, suffix): + + lib_file_name = f"libtsfile.{suffix}" + source = os.path.join(source_dir, lib_file_name) + target = os.path.join(target_dir, lib_file_name) + shutil.copyfile(source, target) + + if system == "Linux": + link_name = os.path.join(target_dir, f"libtsfile.so") + if os.path.exists(link_name): + os.remove(link_name) + os.symlink(lib_file_name, link_name) + + +def copy_header(source, target): + shutil.copyfile(source, target) + + +class BuildExt(build_ext): + def build_extensions(self): + numpy_include = np.get_include() + for ext in self.extensions: + ext.include_dirs.append(numpy_include) + super().build_extensions() + + +project_dir = os.path.dirname(__file__) +libtsfile_shard_dir = os.path.join(project_dir, "..", "cpp", "target", "build", "lib") +libtsfile_dir = os.path.join(project_dir, "tsfile") +include_dir = os.path.join(project_dir, "tsfile") +source_file = os.path.join(project_dir, "tsfile", "tsfile_pywrapper.pyx") + +if platform.system() == "Darwin": + copy_lib_files("Darwin", libtsfile_shard_dir, libtsfile_dir, "dylib") + copy_lib_files("Darwin", libtsfile_shard_dir, libtsfile_dir, "1.0.dylib") +elif platform.system() == "Linux": + copy_lib_files("Linux", libtsfile_shard_dir, libtsfile_dir, "so.1.0") +else: + copy_lib_files("Windows", libtsfile_shard_dir, libtsfile_dir, "dll") + copy_lib_files("Windows", libtsfile_shard_dir, libtsfile_dir, "dll.a") + + +source_include_dir = os.path.join( + project_dir, "..", "cpp", "src", "cwrapper", "TsFile-cwrapper.h" +) +target_include_dir = os.path.join(project_dir, "tsfile", "TsFile-cwrapper.h") +copy_header(source_include_dir, target_include_dir) + + +if platform.system() == "Windows": + ext_modules_tsfile = [ + Extension( + "tsfile.tsfile_pywrapper", + sources=[source_file], + libraries=["tsfile"], + library_dirs=[libtsfile_dir], + include_dirs=[include_dir, np.get_include()], + extra_compile_args=["-std=c++11"], + language="c++", + ) + ] +else: + ext_modules_tsfile = [ + Extension( + "tsfile.tsfile_pywrapper", + sources=[source_file], + libraries=["tsfile"], + library_dirs=[libtsfile_dir], + include_dirs=[include_dir, np.get_include()], + runtime_library_dirs=[libtsfile_dir], + extra_compile_args=["-std=c++11"], + language="c++", + ) + ] + +setup( + name="tsfile", + version="0.1", + description="Tsfile reader and writer for python", + url="https://tsfile.apache.org", + author='"Apache TsFile"', + packages=["tsfile"], + license="Apache 2.0", + ext_modules=cythonize(ext_modules_tsfile), + cmdclass={"build_ext": BuildExt}, + include_dirs=[np.get_include()], + package_data={ + "tsfile": [ + os.path.join("*tsfile", "*.so*"), + os.path.join("*tsfile", "*.dylib"), + os.path.join("*tsfile", "*.pyd"), + os.path.join("*tsfile", "*.dll"), + os.path.join("*tsfile", "*.dll.a"), + os.path.join("tsfile", "tsfile.py"), + ] + }, + include_package_data=True, +) diff --git a/python/test.py b/python/test.py new file mode 100644 index 00000000..7b0786c1 --- /dev/null +++ b/python/test.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import platform +import shutil +import glob + +import unittest as ut +import numpy as np +import pandas as pd + +if platform.system() == "Windows": + extra_dll_dir = os.path.join(os.path.dirname(__file__), "tsfile") + os.add_dll_directory(extra_dll_dir) + print(extra_dll_dir) + print(glob.glob(extra_dll_dir + "/*")) + +import tsfile as ts +from tsfile.tsfile import EmptyFileError + +TABLE_NAME = "test_table" +DATA_PATH = os.path.join(os.path.dirname(__file__), "data") + + +# test writing data +def test_write_tsfile(): + # test write empty data + df = pd.DataFrame() + ts.write_tsfile(DATA_PATH + "/empty.tsfile", TABLE_NAME, df) + assert not os.path.exists(DATA_PATH + "/empty.tsfile") + + # data without Time + # 1000 rows data + level = np.linspace(2000, 3000, num=1000, dtype=np.float32) + num = np.arange(10000, 11000, dtype=np.int64) + df = pd.DataFrame({"level": level, "num": num}) + with ut.TestCase().assertRaises(AttributeError): + ts.write_tsfile(DATA_PATH + "/no_time.tsfile", TABLE_NAME, df) + + # time with wrong type + time = np.arange(1, 1001, dtype=np.float32) + df = pd.DataFrame({"Time": time, "level": level, "num": num}) + with ut.TestCase().assertRaises(TypeError): + ts.write_tsfile(DATA_PATH + "/wrong_time_type.tsfile", TABLE_NAME, df) + # TXT is not support yet + time = np.arange(1, 1001, dtype=np.int64) + text = np.random.choice(["a", "b", "c"], 1000) + df = pd.DataFrame({"Time": time, "text": text}) + with ut.TestCase().assertRaises(TypeError): + ts.write_tsfile(DATA_PATH + "/txt.tsfile", TABLE_NAME, df) + + # full datatypes test + time = np.arange(1, 1001, dtype=np.int64) # int64 + level = np.linspace(2000, 3000, num=1000, dtype=np.float32) # float32 + num = np.arange(10000, 11000, dtype=np.int64) # int64 + bools = np.random.choice([True, False], 1000) # bool + double = np.random.rand(1000) # double + df = pd.DataFrame( + {"Time": time, "level": level, "num": num, "bools": bools, "double": double} + ) + ts.write_tsfile(DATA_PATH + "/full_datatypes.tsfile", TABLE_NAME, df) + + +# test reading data +def test_read_tsfile(): + # skip test on windows because of the bug in the tsfile library + if platform.system() == "Windows": + return + # test read a non-existent file + with ut.TestCase().assertRaises(FileNotFoundError): + ts.read_tsfile(DATA_PATH + "/notexist.tsfile", TABLE_NAME, ["level", "num"]) + + # test read empty file + with open(DATA_PATH + "/empty.tsfile", "w", encoding="utf-8") as f: + pass + + with ut.TestCase().assertRaises(EmptyFileError): + ts.read_tsfile(DATA_PATH + "/empty.tsfile", TABLE_NAME, ["level", "num"]) + + FILE_NAME = DATA_PATH + "/full_datatypes.tsfile" + # test read data + ## 1. read all data + df, _ = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["level", "num", "bools", "double"]) + assert df.shape == (1000, 5) + assert df["level"].dtype == np.float32 + assert df["Time"].dtype == np.int64 + assert df["num"].dtype == np.int64 + assert df["bools"].dtype == np.bool_ + assert df["double"].dtype == np.float64 + + ## 2. read with chunksize + df, _ = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["level", "num"], chunksize=100) + assert df.shape == (100, 3) + assert df["level"].dtype == np.float32 + assert df["Time"].sum() == np.arange(1, 101).sum() + + ## 3. read with iterator + chunk_num = 0 + with ts.read_tsfile( + FILE_NAME, TABLE_NAME, ["level", "num"], iterator=True, chunksize=100 + ) as reader: + for chunk, _ in reader: + assert chunk.shape == (100, 3) + assert chunk["level"].dtype == np.float32 + assert ( + chunk["Time"].sum() + == np.arange(1 + chunk_num * 100, 101 + chunk_num * 100).sum() + ) + chunk_num += 1 + assert chunk_num == 10 + + ## 4. read with time scale + df, _ = ts.read_tsfile(FILE_NAME, TABLE_NAME, ["num"], start_time=50, end_time=99) + assert df.shape == (50, 2) + assert df["num"][0] == 10049 + assert df["num"][9] == 10058 + + ## 5. read with time scale and chunksize + df, _ = ts.read_tsfile( + FILE_NAME, TABLE_NAME, ["num"], start_time=50, end_time=99, chunksize=10 + ) + assert df.shape == (10, 2) + assert df["num"][0] == 10049 + assert df["num"][9] == 10058 + + ## 6. read with time scale and iterator + chunk_num = 0 + with ts.read_tsfile( + FILE_NAME, + TABLE_NAME, + ["num"], + start_time=50, + end_time=99, + iterator=True, + chunksize=10, + ) as reader: + for chunk, _ in reader: + assert chunk.shape == (10, 2) + assert chunk["num"][0] == 10049 + chunk_num * 10 + assert chunk["num"][9] == 10058 + chunk_num * 10 + chunk_num += 1 + assert chunk_num == 5 + + +if __name__ == "__main__": + if os.path.exists(DATA_PATH): + print("Remove old data") + shutil.rmtree(DATA_PATH) + os.makedirs(DATA_PATH) + else: + os.makedirs(DATA_PATH) + test_write_tsfile() + test_read_tsfile() + print("All tests passed") + shutil.rmtree(DATA_PATH) diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py new file mode 100644 index 00000000..daa9e0d6 --- /dev/null +++ b/python/tsfile/__init__.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from .tsfile import read_tsfile, write_tsfile diff --git a/python/tsfile/tsfile.pxd b/python/tsfile/tsfile.pxd new file mode 100644 index 00000000..a41794e2 --- /dev/null +++ b/python/tsfile/tsfile.pxd @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 +cdef extern from "limits.h": + long long LLONG_MAX + long long LLONG_MIN + +cdef extern from "./TsFile-cwrapper.h": + # common + ctypedef int ErrorCode + ctypedef long long timestamp + ctypedef long long SchemaInfo + + + + # for read data from tsfile + ctypedef void* CTsFileReader + ctypedef void* TsFileRowData + ctypedef void* QueryDataRetINTERNAL + ctypedef void* TimeFilterExpression + + cdef struct query_data_ret: + char** column_names + int column_num + QueryDataRetINTERNAL data + + ctypedef query_data_ret* QueryDataRet + + + # for writer data to tsfile + ctypedef void* CTsFileWriter + cdef struct column_schema: + char* name + SchemaInfo column_def + ctypedef column_schema ColumnSchema + + cdef struct TableSchema: + char* table_name + ColumnSchema** column_schema + int column_num + + cdef struct Tablet: + char* table_name + ColumnSchema** column_schema + int column_num + timestamp* times + bint** bitmap + void** value + int cur_num + int max_capacity + + ctypedef Tablet DataResult + + # Function Declarations + # reader:tsfile reader + CTsFileReader ts_reader_open(const char* path, ErrorCode* err_code) + ErrorCode ts_reader_close(CTsFileReader reader) + + # writer:tsfile writer + CTsFileWriter ts_writer_open(const char* path, ErrorCode* err_code) + ErrorCode ts_writer_close(CTsFileWriter writer) + + + # read tsfile data + QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, + char** columns, int colum_num, timestamp start_time, timestamp end_time) + QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, + char** columns, int colum_num) + DataResult* ts_next(QueryDataRet data, int expect_line_count) + ErrorCode destory_query_dataret(QueryDataRet query_data_set) + ErrorCode destory_tablet(Tablet* tablet) + + # writer tsfile data + ErrorCode tsfile_register_table(CTsFileWriter writer, TableSchema* schema) + ErrorCode tsfile_register_table_column(CTsFileWriter writer, const char* table_name, ColumnSchema* schema) + TsFileRowData create_tsfile_row(const char* table_name, timestamp timestamp, int column_length) + ErrorCode insert_data_into_tsfile_row_int32(TsFileRowData row_data, char* column_name, int value) + ErrorCode insert_data_into_tsfile_row_int64(TsFileRowData row_data, char* column_name, long long value) + ErrorCode insert_data_into_tsfile_row_float(TsFileRowData row_data, char* column_name, float value) + ErrorCode insert_data_into_tsfile_row_double(TsFileRowData row_data, char* column_name, double value) + ErrorCode insert_data_into_tsfile_row_boolean(TsFileRowData row_data, char* column_name, bint value) + ErrorCode tsfile_write_row_data(CTsFileWriter writer, TsFileRowData data); + ErrorCode tsfile_flush_data(CTsFileWriter writer) + ErrorCode destory_tsfile_row(TsFileRowData data) + + + + \ No newline at end of file diff --git a/python/tsfile/tsfile.py b/python/tsfile/tsfile.py new file mode 100644 index 00000000..0edb1bd5 --- /dev/null +++ b/python/tsfile/tsfile.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +import platform +import ctypes + +if platform.system() == "Windows": + ctypes.CDLL(os.path.join(os.path.dirname(__file__), "libtsfile.dll"), winmode=0) +from .tsfile_pywrapper import tsfile_reader, tsfile_writer +from typing import overload +from pandas import DataFrame + +TIMESTAMP_STR = "Time" + +class EmptyFileError(Exception): + def __init__(self, message="File is empty"): + self.message = message + super().__init__(self.message) + + +# default case -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, +) -> DataFrame: ... + + +# case with filter -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + filter: str, + start_time: int, + end_time: int, +) -> DataFrame: ... + + +# chunksize = int -> Dataframe +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + chunksize: int, +) -> DataFrame: ... + + +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + filter: str, + start_time: int, + end_time: int, + chunksize: int, +) -> DataFrame: ... + + +# iterator = True -> Iterator +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + iterator: bool, + chunksize: int, +) -> tsfile_reader: ... + + +@overload +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + start_time: int, + end_time: int, + iterator: bool, + chunksize: int, +) -> tsfile_reader: ... + + +def read_tsfile( + file_path: str, + table_name: str, + columns: list[str] | str, + start_time: int = None, + end_time: int = None, + chunksize: int = None, + iterator: bool = False, +) -> DataFrame | tsfile_reader: + if not os.path.exists(file_path): + raise FileNotFoundError(f"File '{file_path}' does not exist") + if os.path.getsize(file_path) == 0: + raise EmptyFileError(f"File '{file_path}' is empty") + reader = tsfile_reader( + file_path, table_name, columns, start_time, end_time, chunksize + ) + if iterator: + return reader + else: + return reader.read_tsfile() + + +def write_tsfile( + file_path: str, + table_name: str, + data: DataFrame, +): + if data.empty: + return + column_names = data.columns.tolist() + column_types = data.dtypes + + if TIMESTAMP_STR not in column_names: + raise AttributeError("Time column is missing") + if column_types[TIMESTAMP_STR] != "int64": + raise TypeError("Time column must be of type int64") + allowed_types = {"int64", "int32", "bool", "float32", "float64"} + + for col, dtype in column_types.items(): + if dtype.name not in allowed_types: + raise TypeError(f"Column '{col}' has an invalid type '{dtype}'.") + + writer = tsfile_writer(file_path) + writer.write_tsfile(table_name, data) diff --git a/python/tsfile/tsfile_pywrapper.pyx b/python/tsfile/tsfile_pywrapper.pyx new file mode 100644 index 00000000..a910a496 --- /dev/null +++ b/python/tsfile/tsfile_pywrapper.pyx @@ -0,0 +1,348 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 +from libc.string cimport strcpy +from libc.stdlib cimport malloc, free +import pandas as pd +from cpython.bytes cimport PyBytes_AsString +cimport numpy as cnp +import numpy as np +from .tsfile cimport * + +TIMESTAMP_STR = "Time" +TS_TYPE_INT32 = 1 << 8 +TS_TYPE_BOOLEAN = 1 << 9 +TS_TYPE_FLOAT = 1 << 10 +TS_TYPE_DOUBLE = 1 << 11 +TS_TYPE_INT64 = 1 << 12 +TS_TYPE_TEXT = 1 << 13 + +type_mapping = { + 'int32': TS_TYPE_INT32, + 'bool': TS_TYPE_BOOLEAN, + 'float32': TS_TYPE_FLOAT, + 'float64': TS_TYPE_DOUBLE, + 'int64': TS_TYPE_INT64 +} + +cdef class tsfile_reader: + + cdef CTsFileReader reader + cdef QueryDataRet ret + cdef int batch_size + cdef bint read_all_at_once + + + + def __init__(self, pathname, table_name, columns, start_time=None, end_time=None, batch_size=None): + self.open_reader(pathname) + self.query_data_ret(table_name, columns, start_time, end_time) + + if batch_size is not None: + self.batch_size = batch_size + self.read_all_at_once = False + else: + self.batch_size = 1024 + self.read_all_at_once = True + + cdef open_reader(self, pathname): + cdef ErrorCode err_code + err_code = 0 + self.reader = ts_reader_open(pathname.encode('utf-8'), &err_code) + if (err_code != 0): + raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code)) + + cdef query_data_ret(self, table_name, columns, start_time = None, end_time=None): + cdef bytes py_table_name + cdef char** c_columns + py_table_name = table_name.encode('utf-8') + c_table_name = PyBytes_AsString(py_table_name) + if isinstance(columns, str): + columns = [columns] + + c_columns = <char**>malloc(len(columns) * sizeof(char*)) + if not c_columns: + raise MemoryError("Failed to allocate memory for columns") + + for i in range(len(columns)): + c_columns[i] = <char*>malloc(len(columns[i]) + 1) + if not c_columns[i]: + for j in range(i): + free(c_columns[j]) + free(c_columns) + raise MemoryError("Failed to allocate memory for columns") + column_binary = columns[i].encode('utf-8') + column = PyBytes_AsString(column_binary) + strcpy(c_columns[i], column) + + # query data from tsfile + if start_time is not None or end_time is not None: + if start_time is None: + start_time = LLONG_MIN + if end_time is None: + end_time = LLONG_MAX + self.ret = ts_reader_begin_end(self.reader, c_table_name, c_columns, len(columns), start_time, end_time) + else: + self.ret = ts_reader_read(self.reader, table_name.encode('utf-8'), c_columns, len(columns)) + + for i in range(len(columns)): + free(c_columns[i]) + free(c_columns) + + + def read_tsfile(self): + # open tsfile to read + res = pd.DataFrame() + not_null_maps = [] + if self.read_all_at_once: + while True: + chunk, not_null_map = self.get_next_dataframe() + if chunk is not None: + res = pd.concat([res, chunk]) + not_null_maps.append(not_null_map) + else: + break + else: + res, not_null_map = self.get_next_dataframe() + not_null_maps.append(not_null_map) + + self.free_resources() + not_null_map_all = None + if (not_null_maps != []): + not_null_map_all = np.vstack(not_null_maps) + return res, not_null_map_all + + def __iter__(self): + return self + + def __next__(self): + res, not_null_map = self.get_next_dataframe() + if res is None: + raise StopIteration + return res, not_null_map + + def get_next_dataframe(self): + cdef: + DataResult* result + ColumnSchema* schema = NULL + cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] np_array_i64 + cnp.ndarray[cnp.int32_t, ndim=1, mode='c'] np_array_i32 + cnp.ndarray[cnp.float32_t, ndim=1, mode='c'] np_array_float + cnp.ndarray[cnp.float64_t, ndim=1, mode='c'] np_array_double + cnp.ndarray[bint, ndim=1, mode='c'] np_array_bool + cnp.npy_intp length + bint has_null + bytes pystr + str py_string + + res = {} + column_order = [] + not_null_map = [] + + # Time column will be the first column + column_order.append(TIMESTAMP_STR) + + for i in range(self.ret.column_num): + pystr = self.ret.column_names[i] + py_string = pystr.decode('utf-8', 'ignore') + column_order.append(py_string) + res[py_string] = [] + + res[TIMESTAMP_STR] = [] + + if self.ret.data == NULL: + return None, None + + result = ts_next(self.ret, self.batch_size) + + # there is no data meet our requirement + if result.column_schema == NULL: + # free memory + if (destory_tablet(result) != 0): + raise Exception("Failed to destroy tablet") + return None, None + + # time column + length = result.cur_num + 1 + cdef cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] data_array = \ + cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.times) + res[TIMESTAMP_STR] = np.array(data_array, dtype = np.int64) + + for i in range(result.column_num): + + # column name + schema = result.column_schema[i] + pystr = schema.name + column_name = pystr.decode('utf-8') + + # column bitmap + is_not_null = np.empty(length, dtype = bool) + bool_ptr = <char*> result.bitmap[i] + has_null = False + for j in range(length): + is_not_null[j] = bool_ptr[j] != 0 + if bool_ptr[j] == 0 and ~has_null: + has_null = True + + + if schema.column_def == TS_TYPE_INT32: + np_array_i32 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT32, result.value[i]) + arr = np.array(np_array_i32, dtype = np.int32) + + elif schema.column_def == TS_TYPE_BOOLEAN: + arr_bool_ = np.empty(length, dtype=np.bool_) + bool_ptr = <char*> result.value[i] + for j in range(length): + arr_bool_[j] = bool_ptr[j] != 0 + arr = np.array(arr_bool_, dtype = np.bool_) + + elif schema.column_def == TS_TYPE_FLOAT: + np_array_float = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT32, result.value[i]) + arr = np.array(np_array_float, dtype = np.float32) + arr = np.where(is_not_null, arr, np.nan) + res[column_name]=arr + continue + + elif schema.column_def == TS_TYPE_DOUBLE: + np_array_double = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT64, result.value[i]) + arr= np.array(np_array_double, dtype = np.float64) + arr = np.where(is_not_null, arr, np.nan) + res[column_name]=arr + continue + + elif schema.column_def == TS_TYPE_INT64: + np_array_i64 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.value[i]) + arr = np.array(np_array_i64, dtype = np.int64) + else: + raise Exception("UnSupport column type") + + if has_null: + tmp_array = np.full(length, np.nan, np.float64) + tmp_array[is_not_null] = arr[is_not_null] + if schema.column_def == TS_TYPE_INT32: + arr = pd.Series(tmp_array).astype('Int32') + elif schema.column_def == TS_TYPE_BOOLEAN: + arr = pd.Series(tmp_array).astype(np.bool_) + elif schema.column_def == TS_TYPE_INT64: + arr = pd.Series(tmp_array).astype('Int64') + + res[column_name] = arr + not_null_map.append(is_not_null) + if (destory_tablet(result) != 0): + raise Exception("Failed to destroy tablet") + return pd.DataFrame(res, columns = column_order), not_null_map + + def __dealloc__(self): + self.free_resources() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.free_resources() + + cdef free_resources(self): + if self.reader: + if ts_reader_close(self.reader) != 0 : + raise Exception("Failed to close tsfile") + if self.ret: + if destory_query_dataret(self.ret) != 0: + raise Exception("Failed to free query data ret") + self.reader = NULL + self.ret = NULL + +cdef class tsfile_writer: + cdef CTsFileWriter writer + cdef TsFileRowData row_data + + def __init__(self, pathname): + self.open_writer(pathname) + + + cdef open_writer(self, pathname): + cdef ErrorCode err_code + err_code = 0 + self.writer = ts_writer_open(pathname.encode('utf-8'), &err_code) + if (err_code != 0): + raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code)) + + def resister_timeseries(self, table_name, column_name, data_type): + cdef char* c_columns + cdef bytes py_table_name + cdef ColumnSchema schema + cdef bytes encoded_column_name = column_name.encode('utf-8') + py_table_name = table_name.encode('utf-8') + c_table_name = PyBytes_AsString(py_table_name) + schema.name = encoded_column_name + schema.column_def = data_type + if tsfile_register_table_column(self.writer, c_table_name, &schema) != 0: + raise Exception("Failed to register timeseries") + cdef create_row_data(self, table_name, time, column_length): + self.row_data = create_tsfile_row(table_name.encode('utf-8'), time, column_length) + def write_into_row_data(self, column_name, value, type): + cdef char* c_column_name = PyBytes_AsString(column_name.encode('utf-8')) + if type == TS_TYPE_INT32: + insert_data_into_tsfile_row_int32(self.row_data, c_column_name, value) + elif type == TS_TYPE_BOOLEAN: + insert_data_into_tsfile_row_boolean(self.row_data, c_column_name, value) + elif type == TS_TYPE_FLOAT: + insert_data_into_tsfile_row_float(self.row_data, c_column_name, value) + elif type == TS_TYPE_DOUBLE: + insert_data_into_tsfile_row_double(self.row_data, c_column_name, value) + elif type == TS_TYPE_INT64: + insert_data_into_tsfile_row_int64(self.row_data, c_column_name, value) + else: + raise TypeError("Unknown column type") + def write_tsfile(self, table_name, df): + column_names = df.columns.tolist() + column_types = df.dtypes + column_ctypes = [] + for i in range(len(column_names)): + column_type = column_types[i].name + if column_type in type_mapping: + column_ctypes.append(type_mapping[column_type]) + else: + raise TypeError("Unknown column type") + + if (column_names[i] != TIMESTAMP_STR): + self.resister_timeseries(table_name, column_names[i], column_ctypes[i]) + + + for i in range(len(df)): + time = df.iloc[i][TIMESTAMP_STR] + self.create_row_data(table_name, time, len(column_names)) + for j in range(1, len(column_names)): + column_name = column_names[j] + column_value = df.iloc[i][column_name] + column_ctype = column_ctypes[j] + self.write_into_row_data(column_name, column_value, column_ctype) + if tsfile_write_row_data(self.writer, self.row_data) != 0: + raise Exception("Failed to write row data") + + if tsfile_flush_data(self.writer) != 0: + raise Exception("Failed to flush data") + self.row_data = NULL + self.free_resources() + def free_resources(self): + if self.writer != NULL: + if ts_writer_close(self.writer) != 0: + raise Exception("Failed to close tsfile") + self.writer = NULL + def __dealloc__(self): + self.free_resources()
