This is an automated email from the ASF dual-hosted git repository. jfeinauer pushed a commit to branch feature/port-pr-2986-to-0-11 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit afa875e06796758ff41304d1b0e587ac628a3a05 Author: Julian <[email protected]> AuthorDate: Thu Apr 1 08:53:19 2021 +0200 [IOTDB-1273]Feature/restrucutre python module as well as supporting pandas dataframe (#2922) --- .gitignore | 3 + client-py/{src/iotdb/__init__.py => .flake8} | 17 +- client-py/.gitignore | 5 + client-py/README.md | 193 +++++++++ client-py/SessionExample.py | 177 ++++++++ client-py/SessionTest.py | 243 +++++++++++ client-py/iotdb/IoTDBContainer.py | 51 +++ client-py/{src => }/iotdb/Session.py | 453 +++++++++++++++++---- client-py/{src => }/iotdb/__init__.py | 1 - client-py/{src => }/iotdb/utils/Field.py | 10 +- client-py/{src => }/iotdb/utils/IoTDBConstants.py | 0 client-py/{src => }/iotdb/utils/IoTDBRpcDataSet.py | 91 +++-- client-py/{src => }/iotdb/utils/RowRecord.py | 5 +- client-py/{src => }/iotdb/utils/SessionDataSet.py | 109 ++++- client-py/{src => }/iotdb/utils/Tablet.py | 16 +- client-py/{src => }/iotdb/utils/__init__.py | 1 - client-py/pom.xml | 77 ++-- client-py/pypi/README.md | 73 ---- .../utils/IoTDBConstants.py => pyproject.toml} | 61 ++- client-py/readme.md | 71 ---- .../{src/iotdb/__init__.py => requirements.txt} | 4 + .../iotdb/__init__.py => requirements_dev.txt} | 6 + client-py/{pypi => }/setup.py | 16 +- client-py/src/SessionExample.py | 105 ----- client-py/{src/iotdb => tests}/__init__.py | 1 - .../IoTDBConstants.py => tests/test_dataframe.py} | 59 ++- 26 files changed, 1324 insertions(+), 524 deletions(-) diff --git a/.gitignore b/.gitignore index d10c08b..182c9e7 100644 --- a/.gitignore +++ b/.gitignore @@ -94,3 +94,6 @@ node_identifier *.cmake Makefile **/CMakeFiles/ + +# Exclude copied license +/client-py/LICENSE diff --git a/client-py/src/iotdb/__init__.py b/client-py/.flake8 similarity index 72% copy from client-py/src/iotdb/__init__.py copy to client-py/.flake8 index a4797b6..bd7e7bd 100644 --- a/client-py/src/iotdb/__init__.py +++ b/client-py/.flake8 @@ -15,4 +15,19 @@ # specific language governing permissions and limitations # under the License. # - +[flake8] +ignore = + E203, + W503 +max-line-length=200 +exclude = + .git, + test/*, + iotdb/thrift/**/* +extend-exclude = + dist, + build, + venv +show-source = True +statistics = True +format = %(path)s:%(row)d,%(col)d:%(code)s:%(text)s:https://lintlyci.github.io/Flake8Rules/rules/%(code)s.html diff --git a/client-py/.gitignore b/client-py/.gitignore new file mode 100644 index 0000000..0778bd5 --- /dev/null +++ b/client-py/.gitignore @@ -0,0 +1,5 @@ +/iotdb/thrift/ +# generated by Pypi +/build/ +/dist/ +/apache_iotdb.egg-info diff --git a/client-py/README.md b/client-py/README.md new file mode 100644 index 0000000..c2e03cb --- /dev/null +++ b/client-py/README.md @@ -0,0 +1,193 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +# Apache IoTDB + +[](https://www.travis-ci.org/apache/iotdb) +[](https://codecov.io/gh/thulab/iotdb) +[](https://github.com/apache/iotdb/releases) +[](https://www.apache.org/licenses/LICENSE-2.0.html) + + + + +[](https://iotdb.apache.org/) + + +Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for +data management and analysis, deployable on the edge and the cloud. Due to its light-weight +architecture, high performance and rich feature set together with its deep integration with +Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, +high-speed data ingestion and complex data analysis in the IoT industrial fields. + + +# Apache IoTDB Python Client API + +Using the package, you can write data to IoTDB, read data from IoTDB and maintain the schema of IoTDB. + +## Requirements + +You have to install thrift (>=0.13) before using the package. + +## How to use (Example) + +First, download the package: `pip3 install apache-iotdb` + +You can get an example of using the package to read and write data at here: [Example](https://github.com/apache/iotdb/blob/rel/0.11/client-py/src/SessionExample.py) + +(you need to add `import iotdb` in the head of the file) + +Or: + +```python + +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = 'root' +password_ = 'root' +session = Session(ip, port_, username_, password_) +session.open(False) +zone = session.get_time_zone() +session.close() + +``` + +## IoTDB Testcontainer + +The Test Support is based on the lib `testcontainers` (https://testcontainers-python.readthedocs.io/en/latest/index.html) which you need to install in your project if you want to use the feature. + +To start (and stop) an IoTDB Database in a Docker container simply do: +``` +class MyTestCase(unittest.TestCase): + + def test_something(self): + with IoTDBContainer() as c: + session = Session('localhost', c.get_exposed_port(6667), 'root', 'root') + session.open(False) + result = session.execute_query_statement("SHOW TIMESERIES") + print(result) + session.close() +``` + +by default it will load the image `apache/iotdb:latest`, if you want a specific version just pass it like e.g. `IoTDBContainer("apache/iotdb:0.10.0")` to get version `0.10.0` running. + +## Pandas Support + +To easily transform a query result to a [Pandas Dataframe](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) +the SessionDataSet has a method `.todf()` which consumes the dataset and transforms it to a pandas dataframe. + +Example: + +```python + +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = 'root' +password_ = 'root' +session = Session(ip, port_, username_, password_) +session.open(False) +result = session.execute_query_statement("SELECT * FROM root.*") + +# Transform to Pandas Dataset +df = result.todf() + +session.close() + +# Now you can work with the dataframe +df = ... +``` + +## Developers + +### Introduction + +This is an example of how to connect to IoTDB with python, using the thrift rpc interfaces. Things +are almost the same on Windows or Linux, but pay attention to the difference like path separator. + +### Prerequisites + +python3.7 or later is preferred. + +You have to install Thrift (0.11.0 or later) to compile our thrift file into python code. Below is the official +tutorial of installation, eventually, you should have a thrift executable. + +``` +http://thrift.apache.org/docs/install/ +``` + +Before starting you need to install `requirements_dev.txt` in your python environment, e.g. by calling +``` +pip install -r requirements_dev.txt +``` + +### Compile the thrift library and Debug + +In the root of IoTDB's source code folder, run `mvn clean generate-sources -pl client-py -am`. + +This will automatically delete and repopulate the folder `iotdb/thrift` with the generated thrift files. +This folder is ignored from git and should **never be pushed to git!** + +**Notice** Do not upload `iotdb/thrift` to the git repo. + + +### Session Client & Example + +We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar with its Java counterpart), also provided +an example file `client-py/src/SessionExample.py` of how to use the session module. please read it carefully. + + +Or, another simple example: + +```python +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = 'root' +password_ = 'root' +session = Session(ip, port_, username_, password_) +session.open(False) +zone = session.get_time_zone() +session.close() +``` + +### Tests + +Please add your custom tests in `tests` folder. +To run all defined tests just type `pytest .` in the root folder. + +**Notice** Some tests need docker to be started on your system as a test instance is started in a docker container using [testcontainers](https://testcontainers-python.readthedocs.io/en/latest/index.html). + +### Futher Tools + +[black](https://pypi.org/project/black/) and [flake8](https://pypi.org/project/flake8/) are installed for autoformatting and linting. +Both can be run by `black .` or `flake8 .` respectively. + +## Releasing + +To do a release just ensure that you have the right set of generated thrift files. +Then run linting and auto-formatting. +Then, ensure that all tests work (via `pytest .`). +Then you are good to go to do a release! \ No newline at end of file diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py new file mode 100644 index 0000000..bf56555 --- /dev/null +++ b/client-py/SessionExample.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Uncomment the following line to use apache-iotdb module installed by pip3 + +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.Tablet import Tablet + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session.open(False) + +# set and delete storage groups +session.set_storage_group("root.sg_test_01") +session.set_storage_group("root.sg_test_02") +session.set_storage_group("root.sg_test_03") +session.set_storage_group("root.sg_test_04") +session.delete_storage_group("root.sg_test_02") +session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) + +# setting time series. +session.create_time_series( + "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY +) +session.create_time_series( + "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY +) +session.create_time_series( + "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY +) + +# setting multiple time series once. +ts_path_lst_ = [ + "root.sg_test_01.d_01.s_04", + "root.sg_test_01.d_01.s_05", + "root.sg_test_01.d_01.s_06", + "root.sg_test_01.d_01.s_07", + "root.sg_test_01.d_01.s_08", + "root.sg_test_01.d_01.s_09", +] +data_type_lst_ = [ + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_multi_time_series( + ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# delete time series +session.delete_time_series( + [ + "root.sg_test_01.d_01.s_07", + "root.sg_test_01.d_01.s_08", + "root.sg_test_01.d_01.s_09", + ] +) + +# checking time series +print( + "s_07 expecting False, checking result: ", + session.check_time_series_exists("root.sg_test_01.d_01.s_07"), +) +print( + "s_03 expecting True, checking result: ", + session.check_time_series_exists("root.sg_test_01.d_01.s_03"), +) + +# insert one record into the database. +measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +data_types_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) + +# insert multiple records into database +measurements_list_ = [ + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +] +values_list_ = [ + [False, 22, 33, 4.4, 55.1, "test_records01"], + [True, 77, 88, 1.25, 8.125, "test_records02"], +] +data_type_list_ = [data_types_, data_types_] +device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] +session.insert_records( + device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ +) + +# insert one tablet into the database. +values_ = [ + [False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [4, 5, 6, 7] +tablet_ = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ +) +session.insert_tablet(tablet_) + +# insert multiple tablets into database +tablet_01 = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] +) +tablet_02 = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] +) +session.insert_tablets([tablet_01, tablet_02]) + +# insert records of one device +time_list = [1, 2, 3] +measurements_list = [ + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], +] +data_types_list = [ + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +] +values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] + +session.insert_records_of_one_device( + "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list +) + +# execute non-query sql statement +session.execute_non_query_statement( + "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" +) + +# execute sql query statement +session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01") +session_data_set.set_fetch_size(1024) +while session_data_set.has_next(): + print(session_data_set.next()) +session_data_set.close_operation_handle() + +# close session connection. +session.close() + +print("All executions done!!") diff --git a/client-py/SessionTest.py b/client-py/SessionTest.py new file mode 100644 index 0000000..96ff00d --- /dev/null +++ b/client-py/SessionTest.py @@ -0,0 +1,243 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Uncomment the following line to use apache-iotdb module installed by pip3 +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.Tablet import Tablet + +# whether the test has passed +final_flag = True +failed_count = 0 + + +def test_fail(message): + global failed_count + global final_flag + print("*********") + print(message) + print("*********") + final_flag = False + failed_count += 1 + + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session.open(False) + +if not session.is_open(): + print("can't open session") + exit(1) + +# set and delete storage groups +session.set_storage_group("root.sg_test_01") +session.set_storage_group("root.sg_test_02") +session.set_storage_group("root.sg_test_03") +session.set_storage_group("root.sg_test_04") + +if session.delete_storage_group("root.sg_test_02") < 0: + test_fail("delete storage group failed") + +if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0: + test_fail("delete storage groups failed") + +# setting time series. +session.create_time_series( + "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY +) +session.create_time_series( + "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY +) +session.create_time_series( + "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY +) + +# setting multiple time series once. +ts_path_lst_ = [ + "root.sg_test_01.d_01.s_04", + "root.sg_test_01.d_01.s_05", + "root.sg_test_01.d_01.s_06", + "root.sg_test_01.d_01.s_07", + "root.sg_test_01.d_01.s_08", + "root.sg_test_01.d_01.s_09", +] +data_type_lst_ = [ + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_multi_time_series( + ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# delete time series +if ( + session.delete_time_series( + [ + "root.sg_test_01.d_01.s_07", + "root.sg_test_01.d_01.s_08", + "root.sg_test_01.d_01.s_09", + ] + ) + < 0 +): + test_fail("delete time series failed") + +# checking time series +# s_07 expecting False +if session.check_time_series_exists("root.sg_test_01.d_01.s_07"): + test_fail("root.sg_test_01.d_01.s_07 shouldn't exist") + +# s_03 expecting True +if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"): + test_fail("root.sg_test_01.d_01.s_03 should exist") + +# insert one record into the database. +measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +data_types_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +if ( + session.insert_record( + "root.sg_test_01.d_01", 1, measurements_, data_types_, values_ + ) + < 0 +): + test_fail("insert record failed") + +# insert multiple records into database +measurements_list_ = [ + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +] +values_list_ = [ + [False, 22, 33, 4.4, 55.1, "test_records01"], + [True, 77, 88, 1.25, 8.125, "test_records02"], +] +data_type_list_ = [data_types_, data_types_] +device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] +if ( + session.insert_records( + device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ + ) + < 0 +): + test_fail("insert records failed") + +# insert one tablet into the database. +values_ = [ + [False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [4, 5, 6, 7] +tablet_ = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ +) +if session.insert_tablet(tablet_) < 0: + test_fail("insert tablet failed") + +# insert multiple tablets into database +tablet_01 = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] +) +tablet_02 = Tablet( + "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] +) +if session.insert_tablets([tablet_01, tablet_02]) < 0: + test_fail("insert tablets failed") + +# insert records of one device +time_list = [1, 2, 3] +measurements_list = [ + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], +] +data_types_list = [ + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +] +values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] + +if ( + session.insert_records_of_one_device( + "root.sg_test_01.d_01", + time_list, + measurements_list, + data_types_list, + values_list, + ) + < 0 +): + test_fail("insert records of one device failed") + +# execute non-query sql statement +if ( + session.execute_non_query_statement( + "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" + ) + < 0 +): + test_fail( + "execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed" + ) + +# execute sql query statement +session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01") +session_data_set.set_fetch_size(1024) +expect_count = 16 +actual_count = 0 +while session_data_set.has_next(): + actual_count += 1 +session_data_set.close_operation_handle() + +if actual_count != expect_count: + test_fail( + "query count mismatch: expect count: " + + str(expect_count) + + " actual count: " + + str(actual_count) + ) + +# close session connection. +session.close() + +if final_flag: + print("All executions done!!") +else: + print("Some test failed, please have a check") + print("failed count: ", failed_count) + exit(1) diff --git a/client-py/iotdb/IoTDBContainer.py b/client-py/iotdb/IoTDBContainer.py new file mode 100644 index 0000000..9a01887 --- /dev/null +++ b/client-py/iotdb/IoTDBContainer.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from os import environ + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_container_is_ready + +from iotdb.Session import Session + + +class IoTDBContainer(DockerContainer): + IOTDB_USER = environ.get("IOTDB_USER", "root") + IOTDB_PASSWORD = environ.get("IOTDB_PASSWORD", "root") + + def _configure(self): + pass + + @wait_container_is_ready() + def _connect(self): + session = Session( + self.get_container_host_ip(), self.get_exposed_port(6667), "root", "root" + ) + session.open(False) + session.close() + + def __init__(self, image="apache/iotdb:latest", **kwargs): + super(IoTDBContainer, self).__init__(image) + self.port_to_expose = 6667 + self.with_exposed_ports(self.port_to_expose) + + def start(self): + self._configure() + super().start() + self._connect() + return self diff --git a/client-py/src/iotdb/Session.py b/client-py/iotdb/Session.py similarity index 57% rename from client-py/src/iotdb/Session.py rename to client-py/iotdb/Session.py index a9c9f07..e7a3618 100644 --- a/client-py/src/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -17,17 +17,27 @@ # import struct -import sys import time -from .utils.SessionDataSet import SessionDataSet -from .utils.IoTDBConstants import * +from iotdb.utils.SessionDataSet import SessionDataSet from thrift.protocol import TBinaryProtocol, TCompactProtocol from thrift.transport import TSocket, TTransport -from .thrift.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertStringRecordReq, TSInsertTabletReq, \ - TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq +from .thrift.rpc.TSIService import ( + Client, + TSCreateTimeseriesReq, + TSInsertRecordReq, + TSInsertStringRecordReq, + TSInsertTabletReq, + TSExecuteStatementReq, + TSOpenSessionReq, + TSCreateMultiTimeseriesReq, + TSCloseSessionReq, + TSInsertTabletsReq, + TSInsertRecordsReq, + TSInsertRecordsOfOneDeviceReq, +) from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq # for debug @@ -41,15 +51,25 @@ from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZone # TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \ # TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq # from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq +from .utils.IoTDBConstants import TSDataType class Session(object): + SUCCESS_CODE = 200 DEFAULT_FETCH_SIZE = 10000 - DEFAULT_USER = 'root' - DEFAULT_PASSWORD = 'root' - DEFAULT_ZONE_ID = time.strftime('%z') - - def __init__(self, host, port, user=DEFAULT_USER, password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE, zone_id=DEFAULT_ZONE_ID): + DEFAULT_USER = "root" + DEFAULT_PASSWORD = "root" + DEFAULT_ZONE_ID = time.strftime("%z") + + def __init__( + self, + host, + port, + user=DEFAULT_USER, + password=DEFAULT_PASSWORD, + fetch_size=DEFAULT_FETCH_SIZE, + zone_id=DEFAULT_ZONE_ID, + ): self.__host = host self.__port = port self.__user = user @@ -66,30 +86,37 @@ class Session(object): def open(self, enable_rpc_compression): if not self.__is_close: return - self.__transport = TTransport.TFramedTransport(TSocket.TSocket(self.__host, self.__port)) + self.__transport = TTransport.TFramedTransport( + TSocket.TSocket(self.__host, self.__port) + ) if not self.__transport.isOpen(): try: self.__transport.open() except TTransport.TTransportException as e: - print('TTransportException: ', e) + print("TTransportException: ", e) if enable_rpc_compression: self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) else: self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport)) - open_req = TSOpenSessionReq(client_protocol=self.protocol_version, - username=self.__user, - password=self.__password, - zoneId=self.__zone_id) + open_req = TSOpenSessionReq( + client_protocol=self.protocol_version, + username=self.__user, + password=self.__password, + zoneId=self.__zone_id, + ) try: open_resp = self.__client.openSession(open_req) if self.protocol_version != open_resp.serverProtocolVersion: - print("Protocol differ, Client version is {}, but Server version is {}".format( - self.protocol_version, open_resp.serverProtocolVersion)) + print( + "Protocol differ, Client version is {}, but Server version is {}".format( + self.protocol_version, open_resp.serverProtocolVersion + ) + ) # version is less than 0.10 if open_resp.serverProtocolVersion == 0: raise TTransport.TException(message="Protocol not supported.") @@ -108,6 +135,9 @@ class Session(object): self.__is_close = False + def is_open(self): + return not self.__is_close + def close(self): if self.__is_close: return @@ -115,7 +145,10 @@ class Session(object): try: self.__client.closeSession(req) except TTransport.TException as e: - print("Error occurs when closing session at server. Maybe server is down. Error message: ", e) + print( + "Error occurs when closing session at server. Maybe server is down. Error message: ", + e, + ) finally: self.__is_close = True if self.__transport is not None: @@ -129,13 +162,15 @@ class Session(object): status = self.__client.setStorageGroup(self.__session_id, group_name) print("setting storage group {} message: {}".format(group_name, status.message)) + return Session.verify_success(status) + def delete_storage_group(self, storage_group): """ delete one storage group. :param storage_group: String, path of the target storage group. """ groups = [storage_group] - self.delete_storage_groups(groups) + return self.delete_storage_groups(groups) def delete_storage_groups(self, storage_group_lst): """ @@ -143,7 +178,13 @@ class Session(object): :param storage_group_lst: List, paths of the target storage groups. """ status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) - print("delete storage group(s) {} message: {}".format(storage_group_lst, status.message)) + print( + "delete storage group(s) {} message: {}".format( + storage_group_lst, status.message + ) + ) + + return Session.verify_success(status) def create_time_series(self, ts_path, data_type, encoding, compressor): """ @@ -156,11 +197,17 @@ class Session(object): data_type = data_type.value encoding = encoding.value compressor = compressor.value - request = TSCreateTimeseriesReq(self.__session_id, ts_path, data_type, encoding, compressor) + request = TSCreateTimeseriesReq( + self.__session_id, ts_path, data_type, encoding, compressor + ) status = self.__client.createTimeseries(request) print("creating time series {} message: {}".format(ts_path, status.message)) - def create_multi_time_series(self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst): + return Session.verify_success(status) + + def create_multi_time_series( + self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst + ): """ create multiple time series :param ts_path_lst: List of String, complete time series paths (starts from root) @@ -172,10 +219,17 @@ class Session(object): encoding_lst = [encoding.value for encoding in encoding_lst] compressor_lst = [compressor.value for compressor in compressor_lst] - request = TSCreateMultiTimeseriesReq(self.__session_id, ts_path_lst, data_type_lst, - encoding_lst, compressor_lst) + request = TSCreateMultiTimeseriesReq( + self.__session_id, ts_path_lst, data_type_lst, encoding_lst, compressor_lst + ) status = self.__client.createMultiTimeseries(request) - print("creating multiple time series {} message: {}".format(ts_path_lst, status.message)) + print( + "creating multiple time series {} message: {}".format( + ts_path_lst, status.message + ) + ) + + return Session.verify_success(status) def delete_time_series(self, paths_list): """ @@ -183,7 +237,13 @@ class Session(object): :param paths_list: List of time series path, which should be complete (starts from root) """ status = self.__client.deleteTimeseries(self.__session_id, paths_list) - print("deleting multiple time series {} message: {}".format(paths_list, status.message)) + print( + "deleting multiple time series {} message: {}".format( + paths_list, status.message + ) + ) + + return Session.verify_success(status) def check_time_series_exists(self, path): """ @@ -211,10 +271,22 @@ class Session(object): def insert_str_record(self, device_id, timestamp, measurements, string_values): """ special case for inserting one row of String (TEXT) value """ + if type(string_values) == str: + string_values = [string_values] + if type(measurements) == str: + measurements = [measurements] data_types = [TSDataType.TEXT.value for _ in string_values] - request = self.gen_insert_str_record_req(device_id, timestamp, measurements, data_types, string_values) + request = self.gen_insert_str_record_req( + device_id, timestamp, measurements, data_types, string_values + ) status = self.__client.insertStringRecord(request) - print("insert one record to device {} message: {}".format(device_id, status.message)) + print( + "insert one record to device {} message: {}".format( + device_id, status.message + ) + ) + + return Session.verify_success(status) def insert_record(self, device_id, timestamp, measurements, data_types, values): """ @@ -229,11 +301,21 @@ class Session(object): :param values: List, values to be inserted, for each sensor """ data_types = [data_type.value for data_type in data_types] - request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values) + request = self.gen_insert_record_req( + device_id, timestamp, measurements, data_types, values + ) status = self.__client.insertRecord(request) - print("insert one record to device {} message: {}".format(device_id, status.message)) + print( + "insert one record to device {} message: {}".format( + device_id, status.message + ) + ) + + return Session.verify_success(status) - def insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst): + def insert_records( + self, device_ids, times, measurements_lst, types_lst, values_lst + ): """ insert multiple rows of data, records are independent to each other, in other words, there's no relationship between those records @@ -247,11 +329,21 @@ class Session(object): for types in types_lst: data_types = [data_type.value for data_type in types] type_values_lst.append(data_types) - request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst) + request = self.gen_insert_records_req( + device_ids, times, measurements_lst, type_values_lst, values_lst + ) status = self.__client.insertRecords(request) - print("insert multiple records to devices {} message: {}".format(device_ids, status.message)) + print( + "insert multiple records to devices {} message: {}".format( + device_ids, status.message + ) + ) - def test_insert_record(self, device_id, timestamp, measurements, data_types, values): + return Session.verify_success(status) + + def test_insert_record( + self, device_id, timestamp, measurements, data_types, values + ): """ this method NOT insert data into database and the server just return after accept the request, this method should be used to test other time cost in client @@ -262,11 +354,21 @@ class Session(object): :param values: List, values to be inserted, for each sensor """ data_types = [data_type.value for data_type in data_types] - request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values) + request = self.gen_insert_record_req( + device_id, timestamp, measurements, data_types, values + ) status = self.__client.testInsertRecord(request) - print("testing! insert one record to device {} message: {}".format(device_id, status.message)) + print( + "testing! insert one record to device {} message: {}".format( + device_id, status.message + ) + ) + + return Session.verify_success(status) - def test_insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst): + def test_insert_records( + self, device_ids, times, measurements_lst, types_lst, values_lst + ): """ this method NOT insert data into database and the server just return after accept the request, this method should be used to test other time cost in client @@ -280,43 +382,68 @@ class Session(object): for types in types_lst: data_types = [data_type.value for data_type in types] type_values_lst.append(data_types) - request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst) + request = self.gen_insert_records_req( + device_ids, times, measurements_lst, type_values_lst, values_lst + ) status = self.__client.testInsertRecords(request) print("testing! insert multiple records, message: {}".format(status.message)) - def gen_insert_record_req(self, device_id, timestamp, measurements, data_types, values): + return Session.verify_success(status) + + def gen_insert_record_req( + self, device_id, timestamp, measurements, data_types, values + ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): print("length of data types does not equal to length of values!") # could raise an error here. return values_in_bytes = Session.value_to_bytes(data_types, values) - return TSInsertRecordReq(self.__session_id, device_id, measurements, values_in_bytes, timestamp) + return TSInsertRecordReq( + self.__session_id, device_id, measurements, values_in_bytes, timestamp + ) - def gen_insert_str_record_req(self, device_id, timestamp, measurements, data_types, values): - if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print("length of data types does not equal to length of values!") - # could raise an error here. - return - values_in_bytes = Session.value_to_bytes(data_types, values) - return TSInsertStringRecordReq(self.__session_id, device_id, measurements, values_in_bytes, timestamp) - - def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst): - if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \ - (len(device_ids) != len(times)) or (len(times) != len(values_lst)): - print("deviceIds, times, measurementsList and valuesList's size should be equal") + def gen_insert_str_record_req( + self, device_id, timestamp, measurements, data_types, values + ): + if (len(values) != len(data_types)) or (len(values) != len(measurements)): + print("length of data types does not equal to length of values!") + # could raise an error here. + return + return TSInsertStringRecordReq( + self.__session_id, device_id, measurements, values, timestamp + ) + + def gen_insert_records_req( + self, device_ids, times, measurements_lst, types_lst, values_lst + ): + if ( + (len(device_ids) != len(measurements_lst)) + or (len(times) != len(types_lst)) + or (len(device_ids) != len(times)) + or (len(times) != len(values_lst)) + ): + print( + "deviceIds, times, measurementsList and valuesList's size should be equal" + ) # could raise an error here. return value_lst = [] - for values, data_types, measurements in zip(values_lst, types_lst, measurements_lst): + for values, data_types, measurements in zip( + values_lst, types_lst, measurements_lst + ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print("deviceIds, times, measurementsList and valuesList's size should be equal") + print( + "deviceIds, times, measurementsList and valuesList's size should be equal" + ) # could raise an error here. return values_in_bytes = Session.value_to_bytes(data_types, values) value_lst.append(values_in_bytes) - return TSInsertRecordsReq(self.__session_id, device_ids, measurements_lst, value_lst, times) + return TSInsertRecordsReq( + self.__session_id, device_ids, measurements_lst, value_lst, times + ) def insert_tablet(self, tablet): """ @@ -331,7 +458,13 @@ class Session(object): :param tablet: a tablet specified above """ status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) - print("insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message)) + print( + "insert one tablet to device {} message: {}".format( + tablet.get_device_id(), status.message + ) + ) + + return Session.verify_success(status) def insert_tablets(self, tablet_lst): """ @@ -341,6 +474,91 @@ class Session(object): status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) print("insert multiple tablets, message: {}".format(status.message)) + return Session.verify_success(status) + + def insert_records_of_one_device( + self, device_id, times_list, measurements_list, types_list, values_list + ): + # sort by timestamp + sorted_zipped = sorted( + zip(times_list, measurements_list, types_list, values_list) + ) + result = zip(*sorted_zipped) + times_list, measurements_list, types_list, values_list = [ + list(x) for x in result + ] + + return self.insert_records_of_one_device_sorted( + device_id, times_list, measurements_list, types_list, values_list + ) + + def insert_records_of_one_device_sorted( + self, device_id, times_list, measurements_list, types_list, values_list + ): + """ + Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc + executeBatch, we pack some insert request in batch and send them to server. If you want improve + your performance, please see insertTablet method + + :param device_id: device id + :param times_list: timestamps list + :param measurements_list: measurements list + :param types_list: types list + :param values_list: values list + :param have_sorted: have these list been sorted by timestamp + """ + # check parameter + size = len(times_list) + if ( + size != len(measurements_list) + or size != len(types_list) + or size != len(values_list) + ): + print( + "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" + ) + return + + # check sorted + if not Session.check_sorted(times_list): + print("insert records of one device error: timestamp not sorted") + return + + request = self.gen_insert_records_of_one_device_request( + device_id, times_list, measurements_list, values_list, types_list + ) + + # send request + status = self.__client.insertRecordsOfOneDevice(request) + print("insert records of one device, message: {}".format(status.message)) + + return Session.verify_success(status) + + def gen_insert_records_of_one_device_request( + self, device_id, times_list, measurements_list, values_list, types_list + ): + binary_value_list = [] + for values, data_types, measurements in zip( + values_list, types_list, measurements_list + ): + data_types = [data_type.value for data_type in data_types] + if (len(values) != len(data_types)) or (len(values) != len(measurements)): + print( + "insert records of one device error: deviceIds, times, measurementsList and valuesList's size should be equal" + ) + # could raise an error here. + return + values_in_bytes = Session.value_to_bytes(data_types, values) + binary_value_list.append(values_in_bytes) + + return TSInsertRecordsOfOneDeviceReq( + self.__session_id, + device_id, + measurements_list, + binary_value_list, + times_list, + ) + def test_insert_tablet(self, tablet): """ this method NOT insert data into database and the server just return after accept the request, this method @@ -348,7 +566,13 @@ class Session(object): :param tablet: a tablet of data """ status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) - print("testing! insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message)) + print( + "testing! insert one tablet to device {} message: {}".format( + tablet.get_device_id(), status.message + ) + ) + + return Session.verify_success(status) def test_insert_tablets(self, tablet_list): """ @@ -356,14 +580,24 @@ class Session(object): should be used to test other time cost in client :param tablet_list: List of tablets """ - status = self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list)) + status = self.__client.testInsertTablets( + self.gen_insert_tablets_req(tablet_list) + ) print("testing! insert multiple tablets, message: {}".format(status.message)) + return Session.verify_success(status) + def gen_insert_tablet_req(self, tablet): data_type_values = [data_type.value for data_type in tablet.get_data_types()] - return TSInsertTabletReq(self.__session_id, tablet.get_device_id(), tablet.get_measurements(), - tablet.get_binary_values(), tablet.get_binary_timestamps(), - data_type_values, tablet.get_row_number()) + return TSInsertTabletReq( + self.__session_id, + tablet.get_device_id(), + tablet.get_measurements(), + tablet.get_binary_values(), + tablet.get_binary_timestamps(), + data_type_values, + tablet.get_row_number(), + ) def gen_insert_tablets_req(self, tablet_lst): device_id_lst = [] @@ -373,26 +607,46 @@ class Session(object): type_lst = [] size_lst = [] for tablet in tablet_lst: - data_type_values = [data_type.value for data_type in tablet.get_data_types()] + data_type_values = [ + data_type.value for data_type in tablet.get_data_types() + ] device_id_lst.append(tablet.get_device_id()) measurements_lst.append(tablet.get_measurements()) values_lst.append(tablet.get_binary_values()) timestamps_lst.append(tablet.get_binary_timestamps()) type_lst.append(data_type_values) size_lst.append(tablet.get_row_number()) - return TSInsertTabletsReq(self.__session_id, device_id_lst, measurements_lst, - values_lst, timestamps_lst, type_lst, size_lst) - - def execute_query_statement(self, sql): + return TSInsertTabletsReq( + self.__session_id, + device_id_lst, + measurements_lst, + values_lst, + timestamps_lst, + type_lst, + size_lst, + ) + + def execute_query_statement(self, sql, timeout=0): """ execute query sql statement and returns SessionDataSet :param sql: String, query sql statement :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) """ - request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size) + request = TSExecuteStatementReq( + self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout + ) resp = self.__client.executeQueryStatement(request) - return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId, - self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp) + return SessionDataSet( + sql, + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) def execute_non_query_statement(self, sql): """ @@ -403,9 +657,13 @@ class Session(object): try: resp = self.__client.executeUpdateStatement(request) status = resp.status - print("execute non-query statement {} message: {}".format(sql, status.message)) + print( + "execute non-query statement {} message: {}".format(sql, status.message) + ) + return Session.verify_success(status) except TTransport.TException as e: print("execution of non-query statement fails because: ", e) + return -1 @staticmethod def value_to_bytes(data_types, values): @@ -413,44 +671,44 @@ class Session(object): values_tobe_packed = [] for data_type, value in zip(data_types, values): if data_type == TSDataType.BOOLEAN.value: - format_str_list.append("h") + format_str_list.append("c") format_str_list.append("?") - values_tobe_packed.append(TSDataType.BOOLEAN.value) + values_tobe_packed.append(bytes([TSDataType.BOOLEAN.value])) values_tobe_packed.append(value) elif data_type == TSDataType.INT32.value: - format_str_list.append("h") + format_str_list.append("c") format_str_list.append("i") - values_tobe_packed.append(TSDataType.INT32.value) + values_tobe_packed.append(bytes([TSDataType.INT32.value])) values_tobe_packed.append(value) elif data_type == TSDataType.INT64.value: - format_str_list.append("h") + format_str_list.append("c") format_str_list.append("q") - values_tobe_packed.append(TSDataType.INT64.value) + values_tobe_packed.append(bytes([TSDataType.INT64.value])) values_tobe_packed.append(value) elif data_type == TSDataType.FLOAT.value: - format_str_list.append("h") + format_str_list.append("c") format_str_list.append("f") - values_tobe_packed.append(TSDataType.FLOAT.value) + values_tobe_packed.append(bytes([TSDataType.FLOAT.value])) values_tobe_packed.append(value) elif data_type == TSDataType.DOUBLE.value: - format_str_list.append("h") + format_str_list.append("c") format_str_list.append("d") - values_tobe_packed.append(TSDataType.DOUBLE.value) + values_tobe_packed.append(bytes([TSDataType.DOUBLE.value])) values_tobe_packed.append(value) elif data_type == TSDataType.TEXT.value: - value_bytes = bytes(value, 'utf-8') - format_str_list.append("h") + value_bytes = bytes(value, "utf-8") + format_str_list.append("c") format_str_list.append("i") format_str_list.append(str(len(value_bytes))) format_str_list.append("s") - values_tobe_packed.append(TSDataType.TEXT.value) + values_tobe_packed.append(bytes([TSDataType.TEXT.value])) values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) else: print("Unsupported data type:" + str(data_type)) # could raise an error here. return - format_str = ''.join(format_str_list) + format_str = "".join(format_str_list) return struct.pack(format_str, *values_tobe_packed) def get_time_zone(self): @@ -467,8 +725,31 @@ class Session(object): request = TSSetTimeZoneReq(self.__session_id, zone_id) try: status = self.__client.setTimeZone(request) - print("setting time zone_id as {}, message: {}".format(zone_id, status.message)) + print( + "setting time zone_id as {}, message: {}".format( + zone_id, status.message + ) + ) except TTransport.TException as e: print("Could not set time zone because: ", e) raise Exception self.__zone_id = zone_id + + @staticmethod + def check_sorted(timestamps): + for i in range(1, len(timestamps)): + if timestamps[i] < timestamps[i - 1]: + return False + return True + + @staticmethod + def verify_success(status): + """ + verify success of operation + :param status: execution result status + """ + if status.code == Session.SUCCESS_CODE: + return 0 + + print("error status is", status) + return -1 diff --git a/client-py/src/iotdb/__init__.py b/client-py/iotdb/__init__.py similarity index 99% copy from client-py/src/iotdb/__init__.py copy to client-py/iotdb/__init__.py index a4797b6..2a1e720 100644 --- a/client-py/src/iotdb/__init__.py +++ b/client-py/iotdb/__init__.py @@ -15,4 +15,3 @@ # specific language governing permissions and limitations # under the License. # - diff --git a/client-py/src/iotdb/utils/Field.py b/client-py/iotdb/utils/Field.py similarity index 96% rename from client-py/src/iotdb/utils/Field.py rename to client-py/iotdb/utils/Field.py index 55d1e33..0756b1c 100644 --- a/client-py/src/iotdb/utils/Field.py +++ b/client-py/iotdb/utils/Field.py @@ -19,11 +19,8 @@ # for package from .IoTDBConstants import TSDataType -# for debug -# from IoTDBConstants import TSDataType class Field(object): - def __init__(self, data_type): """ :param data_type: TSDataType @@ -53,7 +50,9 @@ class Field(object): elif output.get_data_type() == TSDataType.TEXT: output.set_binary_value(field.get_binary_value()) else: - raise Exception("unsupported data type {}".format(output.get_data_type())) + raise Exception( + "unsupported data type {}".format(output.get_data_type()) + ) return output def get_data_type(self): @@ -124,7 +123,7 @@ class Field(object): elif self.__data_type == TSDataType.DOUBLE: return str(self.__double_value) elif self.__data_type == TSDataType.TEXT: - return self.__binary_value.decode('utf-8') + return self.__binary_value.decode("utf-8") else: raise Exception("unsupported data type {}".format(self.__data_type)) @@ -176,4 +175,3 @@ class Field(object): else: raise Exception("unsupported data type {}".format(data_type)) return field - diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py b/client-py/iotdb/utils/IoTDBConstants.py similarity index 100% copy from client-py/src/iotdb/utils/IoTDBConstants.py copy to client-py/iotdb/utils/IoTDBConstants.py diff --git a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py similarity index 78% rename from client-py/src/iotdb/utils/IoTDBRpcDataSet.py rename to client-py/iotdb/utils/IoTDBRpcDataSet.py index 21c2f0e..6920245 100644 --- a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py +++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -17,18 +17,9 @@ # # for package -from .IoTDBConstants import * - -# for debug -# from IoTDBConstants import * - -import sys -from os.path import dirname, abspath -path = dirname(dirname(abspath(__file__))) -sys.path.append(path) - from thrift.transport import TTransport from iotdb.thrift.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq +from iotdb.utils.IoTDBConstants import TSDataType class IoTDBRpcDataSet(object): @@ -37,8 +28,19 @@ class IoTDBRpcDataSet(object): START_INDEX = 2 FLAG = 0x80 - def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id, - client, session_id, query_data_set, fetch_size): + def __init__( + self, + sql, + column_name_list, + column_type_list, + column_name_index, + ignore_timestamp, + query_id, + client, + session_id, + query_data_set, + fetch_size, + ): self.__session_id = session_id self.__ignore_timestamp = ignore_timestamp self.__sql = sql @@ -46,6 +48,7 @@ class IoTDBRpcDataSet(object): self.__client = client self.__fetch_size = fetch_size self.__column_size = len(column_name_list) + self.__default_time_out = 1000 self.__column_name_list = [] self.__column_type_list = [] @@ -56,15 +59,21 @@ class IoTDBRpcDataSet(object): self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1 if column_name_index is not None: - self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))] + self.__column_type_deduplicated_list = [ + None for _ in range(len(column_name_index)) + ] for i in range(len(column_name_list)): name = column_name_list[i] self.__column_name_list.append(name) self.__column_type_list.append(TSDataType[column_type_list[i]]) if name not in self.__column_ordinal_dict: index = column_name_index[name] - self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX - self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]] + self.__column_ordinal_dict[name] = ( + index + IoTDBRpcDataSet.START_INDEX + ) + self.__column_type_deduplicated_list[index] = TSDataType[ + column_type_list[i] + ] else: index = IoTDBRpcDataSet.START_INDEX self.__column_type_deduplicated_list = [] @@ -75,10 +84,14 @@ class IoTDBRpcDataSet(object): if name not in self.__column_ordinal_dict: self.__column_ordinal_dict[name] = index index += 1 - self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]]) + self.__column_type_deduplicated_list.append( + TSDataType[column_type_list[i]] + ) self.__time_bytes = bytes(0) - self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))] + self.__current_bitmap = [ + bytes(0) for _ in range(len(self.__column_type_deduplicated_list)) + ] self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))] self.__query_data_set = query_data_set self.__is_closed = False @@ -91,8 +104,14 @@ class IoTDBRpcDataSet(object): return if self.__client is not None: try: - status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id)) - print("close session {}, message: {}".format(self.__session_id, status.message)) + status = self.__client.closeOperation( + TSCloseOperationReq(self.__session_id, self.__query_id) + ) + print( + "close session {}, message: {}".format( + self.__session_id, status.message + ) + ) except TTransport.TException as e: print("close session {} failed because: ".format(self.__session_id), e) raise Exception @@ -112,7 +131,9 @@ class IoTDBRpcDataSet(object): return False def has_cached_result(self): - return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0) + return (self.__query_data_set is not None) and ( + len(self.__query_data_set.time) != 0 + ) def construct_one_row(self): # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. @@ -146,9 +167,11 @@ class IoTDBRpcDataSet(object): self.__value[i] = value_buffer[:8] self.__query_data_set.valueList[i] = value_buffer[8:] elif data_type == TSDataType.TEXT: - length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False) - self.__value[i] = value_buffer[4: 4 + length] - self.__query_data_set.valueList[i] = value_buffer[4 + length:] + length = int.from_bytes( + value_buffer[:4], byteorder="big", signed=False + ) + self.__value[i] = value_buffer[4 : 4 + length] + self.__query_data_set.valueList[i] = value_buffer[4 + length :] else: print("unsupported data type {}.".format(data_type)) # could raise exception here @@ -157,7 +180,14 @@ class IoTDBRpcDataSet(object): def fetch_results(self): self.__rows_index = 0 - request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True) + request = TSFetchResultsReq( + self.__session_id, + self.__sql, + self.__fetch_size, + self.__query_id, + True, + self.__default_time_out, + ) try: resp = self.__client.fetchResults(request) if not resp.hasResultSet: @@ -171,10 +201,13 @@ class IoTDBRpcDataSet(object): def is_null(self, index, row_num): bitmap = self.__current_bitmap[index] shift = row_num % 8 - return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0 + return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xFF)) == 0 def is_null_by_index(self, column_index): - index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX + index = ( + self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] + - IoTDBRpcDataSet.START_INDEX + ) # time column will never be None if index < 0: return True @@ -191,7 +224,11 @@ class IoTDBRpcDataSet(object): if column_index <= 0: raise Exception("Column index should start from 1") if column_index > len(self.__column_name_list): - raise Exception("column index {} out of range {}".format(column_index, self.__column_size)) + raise Exception( + "column index {} out of range {}".format( + column_index, self.__column_size + ) + ) return self.__column_name_list[column_index - 1] def get_fetch_size(self): diff --git a/client-py/src/iotdb/utils/RowRecord.py b/client-py/iotdb/utils/RowRecord.py similarity index 96% rename from client-py/src/iotdb/utils/RowRecord.py rename to client-py/iotdb/utils/RowRecord.py index 46d810e..aa51c71 100644 --- a/client-py/src/iotdb/utils/RowRecord.py +++ b/client-py/iotdb/utils/RowRecord.py @@ -24,15 +24,12 @@ from .Field import Field # from IoTDBConstants import TSDataType # from Field import Field -class RowRecord(object): +class RowRecord(object): def __init__(self, timestamp, field_list=None): self.__timestamp = timestamp self.__field_list = field_list - def add_field(self, field): - self.__field_list.append(field) - def add_field(self, value, data_type): self.__field_list.append(Field.get_field(value, data_type)) diff --git a/client-py/src/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py similarity index 56% rename from client-py/src/iotdb/utils/SessionDataSet.py rename to client-py/iotdb/utils/SessionDataSet.py index 91b1989..8437010 100644 --- a/client-py/src/iotdb/utils/SessionDataSet.py +++ b/client-py/iotdb/utils/SessionDataSet.py @@ -16,27 +16,43 @@ # under the License. # -# for package -from .IoTDBConstants import TSDataType -from .IoTDBRpcDataSet import IoTDBRpcDataSet -from .Field import Field -from .RowRecord import RowRecord +import struct -# for debug -# from IoTDBConstants import TSDataType -# from IoTDBRpcDataSet import IoTDBRpcDataSet -# from Field import Field -# from RowRecord import RowRecord +from iotdb.utils.Field import Field -import struct +# for package +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.IoTDBRpcDataSet import IoTDBRpcDataSet +from iotdb.utils.RowRecord import RowRecord +import pandas as pd -class SessionDataSet(object): - def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id, - query_data_set, ignore_timestamp): - self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index, - ignore_timestamp, query_id, client, session_id, query_data_set, 1024) +class SessionDataSet(object): + def __init__( + self, + sql, + column_name_list, + column_type_list, + column_name_index, + query_id, + client, + session_id, + query_data_set, + ignore_timestamp, + ): + self.iotdb_rpc_data_set = IoTDBRpcDataSet( + sql, + column_name_list, + column_type_list, + column_name_index, + ignore_timestamp, + query_id, + client, + session_id, + query_data_set, + 1024, + ) def get_fetch_size(self): return self.iotdb_rpc_data_set.get_fetch_size() @@ -69,11 +85,16 @@ class SessionDataSet(object): index -= 1 data_set_column_index -= 1 column_name = self.iotdb_rpc_data_set.get_column_names()[index] - location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX + location = ( + self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] + - IoTDBRpcDataSet.START_INDEX + ) if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index): value_bytes = self.iotdb_rpc_data_set.get_values()[location] - data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location] + data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[ + location + ] field = Field(data_type) if data_type == TSDataType.BOOLEAN: value = struct.unpack(">?", value_bytes)[0] @@ -99,11 +120,61 @@ class SessionDataSet(object): field = Field(None) out_fields.append(field) - return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields) + return RowRecord( + struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields + ) def close_operation_handle(self): self.iotdb_rpc_data_set.close() + def todf(self): + return resultset_to_pandas(self) + + +def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame: + """ + Transforms a SessionDataSet from IoTDB to a Pandas Data Frame + Each Field from IoTDB is a column in Pandas + :param result_set: + :return: + """ + # get column names and fields + column_names = result_set.get_column_names() + + value_dict = {} + + for i in range(len(column_names)): + value_dict[column_names[i]] = [] + + while result_set.has_next(): + record = result_set.next() + + value_dict["Time"].append(record.get_timestamp()) + + for col in range(len(record.get_fields())): + field: Field = record.get_fields()[col] + + value_dict[column_names[col + 1]].append(get_typed_point(field)) + + return pd.DataFrame(value_dict) + +def get_typed_point(field: Field, none_value=None): + choices = { + # In Case of Boolean, cast to 0 / 1 + TSDataType.BOOLEAN: lambda field: 1 if field.get_bool_value() else 0, + TSDataType.TEXT: lambda field: field.get_string_value(), + TSDataType.FLOAT: lambda field: field.get_float_value(), + TSDataType.INT32: lambda field: field.get_int_value(), + TSDataType.DOUBLE: lambda field: field.get_double_value(), + TSDataType.INT64: lambda field: field.get_long_value(), + } + result_next_type: TSDataType = field.get_data_type() + if result_next_type in choices.keys(): + return choices.get(result_next_type)(field) + elif result_next_type is None: + return none_value + else: + raise Exception(f"Unknown DataType {result_next_type}!") diff --git a/client-py/src/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py similarity index 95% rename from client-py/src/iotdb/utils/Tablet.py rename to client-py/iotdb/utils/Tablet.py index cdb0c21..444f4e8 100644 --- a/client-py/src/iotdb/utils/Tablet.py +++ b/client-py/iotdb/utils/Tablet.py @@ -16,17 +16,12 @@ # under the License. # -# for package -from .IoTDBConstants import * - -# for debug -# from IoTDBConstants import * - import struct +from iotdb.utils.IoTDBConstants import TSDataType -class Tablet(object): +class Tablet(object): def __init__(self, device_id, measurements, data_types, values, timestamps): """ creating a tablet for insertion @@ -88,7 +83,7 @@ class Tablet(object): format_str_list.append("q") values_tobe_packed.append(timestamp) - format_str = ''.join(format_str_list) + format_str = "".join(format_str_list) return struct.pack(format_str, *values_tobe_packed) def get_binary_values(self): @@ -122,7 +117,7 @@ class Tablet(object): values_tobe_packed.append(self.__values[j][i]) elif self.__data_types[i] == TSDataType.TEXT: for j in range(self.__row_number): - value_bytes = bytes(self.__values[j][i], 'utf-8') + value_bytes = bytes(self.__values[j][i], "utf-8") format_str_list.append("i") format_str_list.append(str(len(value_bytes))) format_str_list.append("s") @@ -133,6 +128,5 @@ class Tablet(object): # could raise an error here. return - format_str = ''.join(format_str_list) + format_str = "".join(format_str_list) return struct.pack(format_str, *values_tobe_packed) - diff --git a/client-py/src/iotdb/utils/__init__.py b/client-py/iotdb/utils/__init__.py similarity index 99% rename from client-py/src/iotdb/utils/__init__.py rename to client-py/iotdb/utils/__init__.py index a4797b6..2a1e720 100644 --- a/client-py/src/iotdb/utils/__init__.py +++ b/client-py/iotdb/utils/__init__.py @@ -15,4 +15,3 @@ # specific language governing permissions and limitations # under the License. # - diff --git a/client-py/pom.xml b/client-py/pom.xml index 9df0aea..c4278fb 100644 --- a/client-py/pom.xml +++ b/client-py/pom.xml @@ -39,7 +39,37 @@ </dependencies> <build> <plugins> - <!-- for pypi distribution --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <!-- clean thrift folder --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clean-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <filesets> + <fileset> + <directory>iotdb</directory> + <includes> + <include>thrift/</include> + </includes> + <followSymlinks>false</followSymlinks> + </fileset> + <fileset> + <directory>./</directory> + <includes> + <include>LICENSE</include> + </includes> + </fileset> + </filesets> + </configuration> + </plugin> + <!-- fill thrift folder --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> @@ -57,16 +87,17 @@ </goals> <configuration> <encoding>utf-8</encoding> - <outputDirectory>${project.build.directory}/pypi/</outputDirectory> + <outputDirectory>${basedir}/iotdb/thrift/</outputDirectory> <resources> <resource> - <directory>${basedir}/../thrift/target/generated-sources-python</directory> + <directory>${basedir}/../thrift/target/generated-sources-python/iotdb/thrift/</directory> </resource> </resources> </configuration> </execution> + <!-- Copy License --> <execution> - <id>copy-license-resources</id> + <id>copy-pypi-file-resources</id> <!-- here the phase you need --> <phase>generate-sources</phase> <goals> @@ -74,10 +105,10 @@ </goals> <configuration> <encoding>utf-8</encoding> - <outputDirectory>${project.build.directory}/pypi</outputDirectory> + <outputDirectory>${basedir}/</outputDirectory> <resources> <resource> - <directory>${basedir}/../</directory> + <directory>${basedir}/..</directory> <includes> <include>LICENSE</include> </includes> @@ -85,40 +116,6 @@ </resources> </configuration> </execution> - <execution> - <id>copy-python-file-resources</id> - <!-- here the phase you need --> - <phase>generate-sources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <encoding>utf-8</encoding> - <outputDirectory>${project.build.directory}/pypi/</outputDirectory> - <resources> - <resource> - <directory>${basedir}/src/</directory> - </resource> - </resources> - </configuration> - </execution> - <execution> - <id>copy-pypi-file-resources</id> - <!-- here the phase you need --> - <phase>generate-sources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <encoding>utf-8</encoding> - <outputDirectory>${project.build.directory}/pypi</outputDirectory> - <resources> - <resource> - <directory>${basedir}/pypi</directory> - </resource> - </resources> - </configuration> - </execution> </executions> </plugin> </plugins> diff --git a/client-py/pypi/README.md b/client-py/pypi/README.md deleted file mode 100644 index dc7182c..0000000 --- a/client-py/pypi/README.md +++ /dev/null @@ -1,73 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> - -# Apache IoTDB - -[](https://www.travis-ci.org/apache/iotdb) -[](https://codecov.io/gh/thulab/iotdb) -[](https://github.com/apache/iotdb/releases) -[](https://www.apache.org/licenses/LICENSE-2.0.html) - - - - -[](https://iotdb.apache.org/) - - -Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for -data management and analysis, deployable on the edge and the cloud. Due to its light-weight -architecture, high performance and rich feature set together with its deep integration with -Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, -high-speed data ingestion and complex data analysis in the IoT industrial fields. - - -# Apache IoTDB Python Client API - -Using the package, you can write data to IoTDB, read data from IoTDB and maintain the schema of IoTDB. - -## Requirements - -You have to install thrift (>=0.13) before using the package. - -## How to use (Example) - -First, download the package: `pip3 install apache-iotdb` - -You can get an example of using the package to read and write data at here: [Example](https://github.com/apache/iotdb/blob/rel/0.11/client-py/src/SessionExample.py) - -(you need to add `import iotdb` in the head of the file) - -Or: - -```python - -from iotdb.Session import Session - -ip = "127.0.0.1" -port_ = "6667" -username_ = 'root' -password_ = 'root' -session = Session(ip, port_, username_, password_) -session.open(False) -zone = session.get_time_zone() -session.close() - -``` \ No newline at end of file diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py b/client-py/pyproject.toml similarity index 61% copy from client-py/src/iotdb/utils/IoTDBConstants.py copy to client-py/pyproject.toml index f053af7..389b729 100644 --- a/client-py/src/iotdb/utils/IoTDBConstants.py +++ b/client-py/pyproject.toml @@ -16,39 +16,30 @@ # under the License. # -from enum import Enum, unique +[tool.black] +line-length = 88 +target-version = ['py37'] +include = '\.pyi?$' +exclude = ''' - -@unique -class TSDataType(Enum): - BOOLEAN = 0 - INT32 = 1 - INT64 = 2 - FLOAT = 3 - DOUBLE = 4 - TEXT = 5 - - -@unique -class TSEncoding(Enum): - PLAIN = 0 - PLAIN_DICTIONARY = 1 - RLE = 2 - DIFF = 3 - TS_2DIFF = 4 - BITMAP = 5 - GORILLA_V1 = 6 - REGULAR = 7 - GORILLA = 8 - - -@unique -class Compressor(Enum): - UNCOMPRESSED = 0 - SNAPPY = 1 - GZIP = 2 - LZO = 3 - SDT = 4 - PAA = 5 - PLA = 6 - LZ4 = 7 +( + /( + \.eggs # exclude a few common directories in the + | \.git # root of the project + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | venv + | _build + | buck-out + | build + | dist + | migrations + | test + | iotdb/thrift + )/ + | foo.py # also separately exclude a file named foo.py in + # the root of the project +) +''' \ No newline at end of file diff --git a/client-py/readme.md b/client-py/readme.md index 8bedf08..e69de29 100644 --- a/client-py/readme.md +++ b/client-py/readme.md @@ -1,71 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> - -# Python Client -## Introduction -This is an example of how to connect to IoTDB with python, using the thrift rpc interfaces. Things -are almost the same on Windows or Linux, but pay attention to the difference like path separator. - -## Prerequisites -python3.7 or later is preferred. - -You have to install Thrift (0.11.1 or later) to compile our thrift file into python code. Below is the official -tutorial of installation, eventually, you should have a thrift executable. -``` -http://thrift.apache.org/docs/install/ -``` - -## Compile the thrift library and Debug - -In the root of IoTDB's source code folder, run `mvn generate-sources -pl client-py -am`. - -Then a complete project will be generated at `client-py/target/pypi` folder. -But !BE CAUTIOUS! -All your modifications in `client-py/target/pypi` must be copied manually to `client-py/src/` folder. -Otherwise once you run `mvn clean`, you will lose all your effort. - -Or, you can also copy `client-py/target/pypi/iotdb/thrift` folder to `client-py/src/thrift`, then the -`src` folder will become also a complete python project. -But !BE CAUTIOUS! -Do not upload `client-py/src/thrift` to the git repo. - - -## Session Client & Example -We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar with its Java counterpart), also provided -an example file `client-py/src/SessionExample.py` of how to use the session module. please read it carefully. - - -Or, another simple example: - -```$python - -from iotdb.Session import Session - -ip = "127.0.0.1" -port_ = "6667" -username_ = 'root' -password_ = 'root' -session = Session(ip, port_, username_, password_) -session.open(False) -zone = session.get_time_zone() -session.close() - -``` \ No newline at end of file diff --git a/client-py/src/iotdb/__init__.py b/client-py/requirements.txt similarity index 92% copy from client-py/src/iotdb/__init__.py copy to client-py/requirements.txt index a4797b6..55839d9 100644 --- a/client-py/src/iotdb/__init__.py +++ b/client-py/requirements.txt @@ -16,3 +16,7 @@ # under the License. # +# Pandas Export +pandas==1.2.3 +# Testcontainer +testcontainers==3.3.0 \ No newline at end of file diff --git a/client-py/src/iotdb/__init__.py b/client-py/requirements_dev.txt similarity index 88% copy from client-py/src/iotdb/__init__.py copy to client-py/requirements_dev.txt index a4797b6..23e35d4 100644 --- a/client-py/src/iotdb/__init__.py +++ b/client-py/requirements_dev.txt @@ -16,3 +16,9 @@ # under the License. # +-r requirements.txt +# Pytest to run tests +pytest==6.2.2 +thrift==0.13.0 +flake8==3.9.0 +black==20.8b1 \ No newline at end of file diff --git a/client-py/pypi/setup.py b/client-py/setup.py similarity index 85% rename from client-py/pypi/setup.py rename to client-py/setup.py index a353043..d752254 100644 --- a/client-py/pypi/setup.py +++ b/client-py/setup.py @@ -21,10 +21,10 @@ import io try: - with io.open('README.md', encoding='utf-8') as f: + with io.open("README.md", encoding="utf-8") as f: long_description = f.read() except FileNotFoundError: - long_description = '' + long_description = "" print(long_description) @@ -40,8 +40,10 @@ setuptools.setup( url="https://github.com/apache/iotdb", packages=setuptools.find_packages(), install_requires=[ - 'thrift>=0.13.0', - ], + "thrift>=0.13.0", + "pandas>=1.0.0,<1.99.99", + "testcontainers>=2.0.0", + ], classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", @@ -49,7 +51,7 @@ setuptools.setup( "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ], - python_requires='>=3.7', - license='Apache License, Version 2.0', - website='https://iotdb.apache.org', + python_requires=">=3.7", + license="Apache License, Version 2.0", + website="https://iotdb.apache.org", ) diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py deleted file mode 100644 index 697d075..0000000 --- a/client-py/src/SessionExample.py +++ /dev/null @@ -1,105 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Uncomment the following line to use apache-iotdb module installed by pip3 - -from iotdb.Session import Session -from iotdb.utils.Tablet import Tablet -from iotdb.utils.IoTDBConstants import * - -# creating session connection. -ip = "127.0.0.1" -port_ = "6667" -username_ = 'root' -password_ = 'root' -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id='UTC+8') -session.open(False) - -# set and delete storage groups -session.set_storage_group("root.sg_test_01") -session.set_storage_group("root.sg_test_02") -session.set_storage_group("root.sg_test_03") -session.set_storage_group("root.sg_test_04") -session.delete_storage_group("root.sg_test_02") -session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) - -# setting time series. -session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY) -session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY) -session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY) - -# setting multiple time series once. -ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06", - "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"] -data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, - TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT] -encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] -compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] -session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_) - -# delete time series -session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]) - -# checking time series -print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07")) -print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03")) - -# insert one record into the database. -measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] -values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] -data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, - TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT] -session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) - -# insert multiple records into database -measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], - ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]] -values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"], - [True, 77, 88, 1.25, 8.125, "test_records02"]] -data_type_list_ = [data_types_, data_types_] -device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] -session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_) - -# insert one tablet into the database. -values_ = [[False, 10, 11, 1.1, 10011.1, "test01"], - [True, 100, 11111, 1.25, 101.0, "test02"], - [False, 100, 1, 188.1, 688.25, "test03"], - [True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. -timestamps_ = [4, 5, 6, 7] -tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_) -session.insert_tablet(tablet_) - -# insert multiple tablets into database -tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]) -tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]) -session.insert_tablets([tablet_01, tablet_02]) - -# execute non-query sql statement -session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);") - -# execute sql query statement -session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01") -session_data_set.set_fetch_size(1024) -while session_data_set.has_next(): - print(session_data_set.next()) -session_data_set.close_operation_handle() - -# close session connection. -session.close() - -print("All executions done!!") diff --git a/client-py/src/iotdb/__init__.py b/client-py/tests/__init__.py similarity index 99% rename from client-py/src/iotdb/__init__.py rename to client-py/tests/__init__.py index a4797b6..2a1e720 100644 --- a/client-py/src/iotdb/__init__.py +++ b/client-py/tests/__init__.py @@ -15,4 +15,3 @@ # specific language governing permissions and limitations # under the License. # - diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py b/client-py/tests/test_dataframe.py similarity index 52% rename from client-py/src/iotdb/utils/IoTDBConstants.py rename to client-py/tests/test_dataframe.py index f053af7..30f38e3 100644 --- a/client-py/src/iotdb/utils/IoTDBConstants.py +++ b/client-py/tests/test_dataframe.py @@ -16,39 +16,26 @@ # under the License. # -from enum import Enum, unique - - -@unique -class TSDataType(Enum): - BOOLEAN = 0 - INT32 = 1 - INT64 = 2 - FLOAT = 3 - DOUBLE = 4 - TEXT = 5 - - -@unique -class TSEncoding(Enum): - PLAIN = 0 - PLAIN_DICTIONARY = 1 - RLE = 2 - DIFF = 3 - TS_2DIFF = 4 - BITMAP = 5 - GORILLA_V1 = 6 - REGULAR = 7 - GORILLA = 8 - - -@unique -class Compressor(Enum): - UNCOMPRESSED = 0 - SNAPPY = 1 - GZIP = 2 - LZO = 3 - SDT = 4 - PAA = 5 - PLA = 6 - LZ4 = 7 +from iotdb.Session import Session +from iotdb.IoTDBContainer import IoTDBContainer + +from numpy.testing import assert_array_equal + + +def test_simple_query(): + with IoTDBContainer("apache/iotdb:0.11.2") as db: + db: IoTDBContainer + session = Session(db.get_container_host_ip(), db.get_exposed_port(6667)) + session.open(False) + + # Write data + session.insert_str_record("root.device", 123, "pressure", "15.0") + + # Read + session_data_set = session.execute_query_statement("SELECT * FROM root.*") + df = session_data_set.todf() + + session.close() + + assert list(df.columns) == ["Time", "root.device.pressure"] + assert_array_equal(df.values, [[123.0, 15.0]])
