This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9fd48778854f9c41077a3d1047e592a2d8469f91 Author: liuminghui233 <[email protected]> AuthorDate: Mon Nov 14 20:04:29 2022 +0800 finish python session --- client-py/README.md | 2 +- client-py/SessionExample.py | 762 ++++++++++++++++--------------- client-py/iotdb/Session.py | 224 +++++---- client-py/iotdb/dbapi/Cursor.py | 2 +- client-py/iotdb/utils/IoTDBRpcDataSet.py | 13 +- client-py/iotdb/utils/SessionDataSet.py | 66 ++- client-py/tests/test_dataframe.py | 4 +- client-py/tests/test_tablet.py | 4 +- client-py/tests/test_todf.py | 6 +- 9 files changed, 586 insertions(+), 497 deletions(-) diff --git a/client-py/README.md b/client-py/README.md index 3197cc7186..c124b32881 100644 --- a/client-py/README.md +++ b/client-py/README.md @@ -386,7 +386,7 @@ session.open(False) result = session.execute_query_statement("SELECT * FROM root.*") # Transform to Pandas Dataset -df = result.todf() +df = result.to_df() session.close() diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py index 61e82234db..4f04855def 100644 --- a/client-py/SessionExample.py +++ b/client-py/SessionExample.py @@ -35,382 +35,398 @@ password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") session.open(False) -# set and delete storage groups -session.set_storage_group("root.sg_test_01") -session.set_storage_group("root.sg_test_02") -session.set_storage_group("root.sg_test_03") -session.set_storage_group("root.sg_test_04") -session.delete_storage_group("root.sg_test_02") -session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) - -# setting time series. -session.create_time_series( - "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY -) -session.create_time_series( - "root.sg_test_01.d_02.s_01", - TSDataType.BOOLEAN, - TSEncoding.PLAIN, - Compressor.SNAPPY, - None, - {"tag1": "v1"}, - {"description": "v1"}, - "temperature", -) - -# setting multiple time series once. -ts_path_lst_ = [ - "root.sg_test_01.d_01.s_04", - "root.sg_test_01.d_01.s_05", - "root.sg_test_01.d_01.s_06", - "root.sg_test_01.d_01.s_07", - "root.sg_test_01.d_01.s_08", - "root.sg_test_01.d_01.s_09", -] -data_type_lst_ = [ - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, -] -encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] -compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] -session.create_multi_time_series( - ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ -) - -ts_path_lst_ = [ - "root.sg_test_01.d_02.s_04", - "root.sg_test_01.d_02.s_05", - "root.sg_test_01.d_02.s_06", - "root.sg_test_01.d_02.s_07", - "root.sg_test_01.d_02.s_08", - "root.sg_test_01.d_02.s_09", -] -data_type_lst_ = [ - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, -] -encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] -compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] -tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))] -attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))] -session.create_multi_time_series( - ts_path_lst_, - data_type_lst_, - encoding_lst_, - compressor_lst_, - None, - tags_lst_, - attributes_lst_, - None, -) - -# delete time series -session.delete_time_series( - [ - "root.sg_test_01.d_01.s_07", - "root.sg_test_01.d_01.s_08", - "root.sg_test_01.d_01.s_09", - ] -) - -# checking time series -print( - "s_07 expecting False, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_01.s_07"), -) -print( - "s_03 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_01.s_03"), -) -print( - "d_02.s_01 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_02.s_01"), -) -print( - "d_02.s_06 expecting True, checking result: ", - session.check_time_series_exists("root.sg_test_01.d_02.s_06"), -) - -# insert one record into the database. -measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] -values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] -data_types_ = [ - TSDataType.BOOLEAN, - TSDataType.INT32, - TSDataType.INT64, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, +ts_path_list = [ + "root.sg1.d1.s1", + "root.sg1.d1.s2" ] -session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) - -# insert multiple records into database -measurements_list_ = [ - ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], - ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], -] -values_list_ = [ - [False, 22, 33, 4.4, 55.1, "test_records01"], - [True, 77, 88, 1.25, 8.125, "test_records02"], -] -data_type_list_ = [data_types_, data_types_] -device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] -session.insert_records( - device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ -) - -# insert one tablet into the database. -values_ = [ - [False, 10, 11, 1.1, 10011.1, "test01"], - [True, 100, 11111, 1.25, 101.0, "test02"], - [False, 100, 1, 188.1, 688.25, "test03"], - [True, 0, 0, 0, 6.25, "test04"], -] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. -timestamps_ = [4, 5, 6, 7] -tablet_ = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ -) -session.insert_tablet(tablet_) - -# insert one numpy tablet into the database. -np_values_ = [ - np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), - np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), - np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), - np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), - np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), - np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), -] -np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) -np_tablet_ = NumpyTablet( - "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_ -) -session.insert_tablet(np_tablet_) - -# insert one unsorted numpy tablet into the database. -np_values_unsorted = [ - np.array([False, False, False, True, True], np.dtype(">?")), - np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")), - np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")), - np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")), - np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")), - np.array(["test09", "test08", "test07", "test06", "test05"]), -] -np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8")) -np_tablet_unsorted = NumpyTablet( - "root.sg_test_01.d_02", - measurements_, - data_types_, - np_values_unsorted, - np_timestamps_unsorted, -) -session.insert_tablet(np_tablet_unsorted) -print(np_tablet_unsorted.get_timestamps()) -for value in np_tablet_unsorted.get_values(): - print(value) - -# insert multiple tablets into database -tablet_01 = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] -) -tablet_02 = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] -) -session.insert_tablets([tablet_01, tablet_02]) - -# insert one tablet with empty cells into the database. -values_ = [ - [None, 10, 11, 1.1, 10011.1, "test01"], - [True, None, 11111, 1.25, 101.0, "test02"], - [False, 100, 1, None, 688.25, "test03"], - [True, 0, 0, 0, 6.25, None], -] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. -timestamps_ = [16, 17, 18, 19] -tablet_ = Tablet( - "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ -) -session.insert_tablet(tablet_) - -# insert records of one device -time_list = [1, 2, 3] -measurements_list = [ - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], -] -data_types_list = [ - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], - [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], -] -values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] - -session.insert_records_of_one_device( - "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list -) - -# execute non-query sql statement -session.execute_non_query_statement( - "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" -) - -# execute sql query statement -with session.execute_query_statement( - "select * from root.sg_test_01.d_01" -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) -# execute sql query statement -with session.execute_query_statement( - "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02" -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -# execute statement -with session.execute_statement( - "select * from root.sg_test_01.d_01" -) as session_data_set: - while session_data_set.has_next(): - print(session_data_set.next()) - -session.execute_statement( - "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" -) - -# insert string records of one device -time_list = [1, 2, 3] -measurements_list = [ - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], - ["s_01", "s_02", "s_03"], -] -values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]] - -session.insert_string_records_of_one_device( - "root.sg_test_01.d_03", - time_list, - measurements_list, - values_list, -) - -with session.execute_raw_data_query( - ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4 -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -with session.execute_last_data_query( - ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0 -) as session_data_set: - session_data_set.set_fetch_size(1024) - while session_data_set.has_next(): - print(session_data_set.next()) - -# delete storage group -session.delete_storage_group("root.sg_test_01") - -# create measurement node template -template = Template(name="template_python", share_time=False) -m_node_1 = MeasurementNode( - name="s1", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -m_node_2 = MeasurementNode( - name="s2", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -m_node_3 = MeasurementNode( - name="s3", - data_type=TSDataType.INT64, - encoding=TSEncoding.RLE, - compression_type=Compressor.SNAPPY, -) -template.add_template(m_node_1) -template.add_template(m_node_2) -template.add_template(m_node_3) -session.create_schema_template(template) -print("create template success template_python") - -# create internal node template -template_name = "treeTemplate_python" -template = Template(name=template_name, share_time=True) -i_node_gps = InternalNode(name="GPS", share_time=False) -i_node_v = InternalNode(name="vehicle", share_time=True) -m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY) - -i_node_gps.add_child(m_node_x) -i_node_v.add_child(m_node_x) -template.add_template(i_node_gps) -template.add_template(i_node_v) -template.add_template(m_node_x) -session.create_schema_template(template) -print("create template success treeTemplate_python}") - -print(session.is_measurement_in_template(template_name, "GPS")) -print(session.is_measurement_in_template(template_name, "GPS.x")) -print(session.show_all_templates()) - -# # append schema template -data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE] -encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA] -compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4] - -measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"] -session.add_measurements_in_template( - template_name, - measurements_aligned_path, - data_types, - encoding_list, - compressor_list, - is_aligned=True, -) -# session.drop_schema_template("add_template_python") -measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"] -session.add_measurements_in_template( - template_name, - measurements_aligned_path, - data_types, - encoding_list, - compressor_list, - is_aligned=False, -) -session.delete_node_in_template(template_name, "aligned.s1") -print(session.count_measurements_in_template(template_name)) -print(session.is_path_exist_in_template(template_name, "aligned.s1")) -print(session.is_path_exist_in_template(template_name, "aligned.s2")) - -session.set_schema_template(template_name, "root.python.set") -print(session.show_paths_template_using_on(template_name)) -print(session.show_paths_template_set_on(template_name)) -session.unset_schema_template(template_name, "root.python.set") +fetch_args = { + "start_time": 0, + "end_time": 32, + "interval": 4, + "sliding_step": 1, + "indexes": [0, 3, 5, 9] +} + +print(session.fetch_window_batch(ts_path_list, None, fetch_args)) + +# # set and delete storage groups +# session.set_storage_group("root.sg_test_01") +# session.set_storage_group("root.sg_test_02") +# session.set_storage_group("root.sg_test_03") +# session.set_storage_group("root.sg_test_04") +# session.delete_storage_group("root.sg_test_02") +# session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) +# +# # setting time series. +# session.create_time_series( +# "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# session.create_time_series( +# "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# session.create_time_series( +# "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# session.create_time_series( +# "root.sg_test_01.d_02.s_01", +# TSDataType.BOOLEAN, +# TSEncoding.PLAIN, +# Compressor.SNAPPY, +# None, +# {"tag1": "v1"}, +# {"description": "v1"}, +# "temperature", +# ) +# +# # setting multiple time series once. +# ts_path_lst_ = [ +# "root.sg_test_01.d_01.s_04", +# "root.sg_test_01.d_01.s_05", +# "root.sg_test_01.d_01.s_06", +# "root.sg_test_01.d_01.s_07", +# "root.sg_test_01.d_01.s_08", +# "root.sg_test_01.d_01.s_09", +# ] +# data_type_lst_ = [ +# TSDataType.FLOAT, +# TSDataType.DOUBLE, +# TSDataType.TEXT, +# TSDataType.FLOAT, +# TSDataType.DOUBLE, +# TSDataType.TEXT, +# ] +# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +# session.create_multi_time_series( +# ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +# ) +# +# ts_path_lst_ = [ +# "root.sg_test_01.d_02.s_04", +# "root.sg_test_01.d_02.s_05", +# "root.sg_test_01.d_02.s_06", +# "root.sg_test_01.d_02.s_07", +# "root.sg_test_01.d_02.s_08", +# "root.sg_test_01.d_02.s_09", +# ] +# data_type_lst_ = [ +# TSDataType.FLOAT, +# TSDataType.DOUBLE, +# TSDataType.TEXT, +# TSDataType.FLOAT, +# TSDataType.DOUBLE, +# TSDataType.TEXT, +# ] +# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +# tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))] +# attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))] +# session.create_multi_time_series( +# ts_path_lst_, +# data_type_lst_, +# encoding_lst_, +# compressor_lst_, +# None, +# tags_lst_, +# attributes_lst_, +# None, +# ) +# +# # delete time series +# session.delete_time_series( +# [ +# "root.sg_test_01.d_01.s_07", +# "root.sg_test_01.d_01.s_08", +# "root.sg_test_01.d_01.s_09", +# ] +# ) +# +# # checking time series +# print( +# "s_07 expecting False, checking result: ", +# session.check_time_series_exists("root.sg_test_01.d_01.s_07"), +# ) +# print( +# "s_03 expecting True, checking result: ", +# session.check_time_series_exists("root.sg_test_01.d_01.s_03"), +# ) +# print( +# "d_02.s_01 expecting True, checking result: ", +# session.check_time_series_exists("root.sg_test_01.d_02.s_01"), +# ) +# print( +# "d_02.s_06 expecting True, checking result: ", +# session.check_time_series_exists("root.sg_test_01.d_02.s_06"), +# ) +# +# # insert one record into the database. +# measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +# values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +# data_types_ = [ +# TSDataType.BOOLEAN, +# TSDataType.INT32, +# TSDataType.INT64, +# TSDataType.FLOAT, +# TSDataType.DOUBLE, +# TSDataType.TEXT, +# ] +# session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) +# +# # insert multiple records into database +# measurements_list_ = [ +# ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +# ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +# ] +# values_list_ = [ +# [False, 22, 33, 4.4, 55.1, "test_records01"], +# [True, 77, 88, 1.25, 8.125, "test_records02"], +# ] +# data_type_list_ = [data_types_, data_types_] +# device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] +# session.insert_records( +# device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ +# ) +# +# # insert one tablet into the database. +# values_ = [ +# [False, 10, 11, 1.1, 10011.1, "test01"], +# [True, 100, 11111, 1.25, 101.0, "test02"], +# [False, 100, 1, 188.1, 688.25, "test03"], +# [True, 0, 0, 0, 6.25, "test04"], +# ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +# timestamps_ = [4, 5, 6, 7] +# tablet_ = Tablet( +# "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ +# ) +# session.insert_tablet(tablet_) +# +# # insert one numpy tablet into the database. +# np_values_ = [ +# np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), +# np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), +# np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), +# np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), +# np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), +# np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), +# ] +# np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) +# np_tablet_ = NumpyTablet( +# "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_ +# ) +# session.insert_tablet(np_tablet_) +# +# # insert one unsorted numpy tablet into the database. +# np_values_unsorted = [ +# np.array([False, False, False, True, True], np.dtype(">?")), +# np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")), +# np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")), +# np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")), +# np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")), +# np.array(["test09", "test08", "test07", "test06", "test05"]), +# ] +# np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8")) +# np_tablet_unsorted = NumpyTablet( +# "root.sg_test_01.d_02", +# measurements_, +# data_types_, +# np_values_unsorted, +# np_timestamps_unsorted, +# ) +# session.insert_tablet(np_tablet_unsorted) +# print(np_tablet_unsorted.get_timestamps()) +# for value in np_tablet_unsorted.get_values(): +# print(value) +# +# # insert multiple tablets into database +# tablet_01 = Tablet( +# "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] +# ) +# tablet_02 = Tablet( +# "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] +# ) +# session.insert_tablets([tablet_01, tablet_02]) +# +# # insert one tablet with empty cells into the database. +# values_ = [ +# [None, 10, 11, 1.1, 10011.1, "test01"], +# [True, None, 11111, 1.25, 101.0, "test02"], +# [False, 100, 1, None, 688.25, "test03"], +# [True, 0, 0, 0, 6.25, None], +# ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +# timestamps_ = [16, 17, 18, 19] +# tablet_ = Tablet( +# "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ +# ) +# session.insert_tablet(tablet_) +# +# # insert records of one device +# time_list = [1, 2, 3] +# measurements_list = [ +# ["s_01", "s_02", "s_03"], +# ["s_01", "s_02", "s_03"], +# ["s_01", "s_02", "s_03"], +# ] +# data_types_list = [ +# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +# ] +# values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] +# +# session.insert_records_of_one_device( +# "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list +# ) +# +# # execute non-query sql statement +# session.execute_non_query_statement( +# "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" +# ) +# +# # execute sql query statement +# with session.execute_query_statement( +# "select * from root.sg_test_01.d_01" +# ) as session_data_set: +# session_data_set.set_fetch_size(1024) +# while session_data_set.has_next(): +# print(session_data_set.next()) +# # execute sql query statement +# with session.execute_query_statement( +# "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02" +# ) as session_data_set: +# session_data_set.set_fetch_size(1024) +# while session_data_set.has_next(): +# print(session_data_set.next()) +# +# # execute statement +# with session.execute_statement( +# "select * from root.sg_test_01.d_01" +# ) as session_data_set: +# while session_data_set.has_next(): +# print(session_data_set.next()) +# +# session.execute_statement( +# "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)" +# ) +# +# # insert string records of one device +# time_list = [1, 2, 3] +# measurements_list = [ +# ["s_01", "s_02", "s_03"], +# ["s_01", "s_02", "s_03"], +# ["s_01", "s_02", "s_03"], +# ] +# values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]] +# +# session.insert_string_records_of_one_device( +# "root.sg_test_01.d_03", +# time_list, +# measurements_list, +# values_list, +# ) +# +# with session.execute_raw_data_query( +# ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4 +# ) as session_data_set: +# session_data_set.set_fetch_size(1024) +# while session_data_set.has_next(): +# print(session_data_set.next()) +# +# with session.execute_last_data_query( +# ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0 +# ) as session_data_set: +# session_data_set.set_fetch_size(1024) +# while session_data_set.has_next(): +# print(session_data_set.next()) +# +# # delete storage group +# session.delete_storage_group("root.sg_test_01") +# +# # create measurement node template +# template = Template(name="template_python", share_time=False) +# m_node_1 = MeasurementNode( +# name="s1", +# data_type=TSDataType.INT64, +# encoding=TSEncoding.RLE, +# compression_type=Compressor.SNAPPY, +# ) +# m_node_2 = MeasurementNode( +# name="s2", +# data_type=TSDataType.INT64, +# encoding=TSEncoding.RLE, +# compression_type=Compressor.SNAPPY, +# ) +# m_node_3 = MeasurementNode( +# name="s3", +# data_type=TSDataType.INT64, +# encoding=TSEncoding.RLE, +# compression_type=Compressor.SNAPPY, +# ) +# template.add_template(m_node_1) +# template.add_template(m_node_2) +# template.add_template(m_node_3) +# session.create_schema_template(template) +# print("create template success template_python") +# +# # create internal node template +# template_name = "treeTemplate_python" +# template = Template(name=template_name, share_time=True) +# i_node_gps = InternalNode(name="GPS", share_time=False) +# i_node_v = InternalNode(name="vehicle", share_time=True) +# m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY) +# +# i_node_gps.add_child(m_node_x) +# i_node_v.add_child(m_node_x) +# template.add_template(i_node_gps) +# template.add_template(i_node_v) +# template.add_template(m_node_x) +# +# session.create_schema_template(template) +# print("create template success treeTemplate_python}") +# +# print(session.is_measurement_in_template(template_name, "GPS")) +# print(session.is_measurement_in_template(template_name, "GPS.x")) +# print(session.show_all_templates()) +# +# # # append schema template +# data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE] +# encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA] +# compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4] +# +# measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"] +# session.add_measurements_in_template( +# template_name, +# measurements_aligned_path, +# data_types, +# encoding_list, +# compressor_list, +# is_aligned=True, +# ) +# # session.drop_schema_template("add_template_python") +# measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"] +# session.add_measurements_in_template( +# template_name, +# measurements_aligned_path, +# data_types, +# encoding_list, +# compressor_list, +# is_aligned=False, +# ) +# session.delete_node_in_template(template_name, "aligned.s1") +# print(session.count_measurements_in_template(template_name)) +# print(session.is_path_exist_in_template(template_name, "aligned.s1")) +# print(session.is_path_exist_in_template(template_name, "aligned.s2")) +# +# session.set_schema_template(template_name, "root.python.set") +# print(session.show_paths_template_using_on(template_name)) +# print(session.show_paths_template_set_on(template_name)) +# session.unset_schema_template(template_name, "root.python.set") +# +# # drop template +# session.drop_schema_template("template_python") +# session.drop_schema_template(template_name) +# print("drop template success, template_python and treeTemplate_python") -# drop template -session.drop_schema_template("template_python") -session.drop_schema_template(template_name) -print("drop template success, template_python and treeTemplate_python") # close session connection. session.close() diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py index 471dc98b62..64be51e9ac 100644 --- a/client-py/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -18,6 +18,8 @@ import logging import struct import time + +import numpy as np from thrift.protocol import TBinaryProtocol, TCompactProtocol from thrift.transport import TSocket, TTransport @@ -53,6 +55,8 @@ from .thrift.rpc.ttypes import ( TSRawDataQueryReq, TSLastDataQueryReq, TSInsertStringRecordsOfOneDeviceReq, + TGroupByTimeParameter, + TSFetchWindowBatchReq, ) # for debug # from IoTDBConstants import * @@ -78,13 +82,13 @@ class Session(object): DEFAULT_ZONE_ID = time.strftime("%z") def __init__( - self, - host, - port, - user=DEFAULT_USER, - password=DEFAULT_PASSWORD, - fetch_size=DEFAULT_FETCH_SIZE, - zone_id=DEFAULT_ZONE_ID, + self, + host, + port, + user=DEFAULT_USER, + password=DEFAULT_PASSWORD, + fetch_size=DEFAULT_FETCH_SIZE, + zone_id=DEFAULT_ZONE_ID, ): self.__host = host self.__port = port @@ -206,15 +210,15 @@ class Session(object): return Session.verify_success(status) def create_time_series( - self, - ts_path, - data_type, - encoding, - compressor, - props=None, - tags=None, - attributes=None, - alias=None, + self, + ts_path, + data_type, + encoding, + compressor, + props=None, + tags=None, + attributes=None, + alias=None, ): """ create single time series @@ -249,7 +253,7 @@ class Session(object): return Session.verify_success(status) def create_aligned_time_series( - self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst + self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst ): """ create aligned time series @@ -281,15 +285,15 @@ class Session(object): return Session.verify_success(status) def create_multi_time_series( - self, - ts_path_lst, - data_type_lst, - encoding_lst, - compressor_lst, - props_lst=None, - tags_lst=None, - attributes_lst=None, - alias_lst=None, + self, + ts_path_lst, + data_type_lst, + encoding_lst, + compressor_lst, + props_lst=None, + tags_lst=None, + attributes_lst=None, + alias_lst=None, ): """ create multiple time series @@ -386,7 +390,7 @@ class Session(object): return Session.verify_success(status) def insert_aligned_str_record( - self, device_id, timestamp, measurements, string_values + self, device_id, timestamp, measurements, string_values ): """special case for inserting one row of String (TEXT) value""" if type(string_values) == str: @@ -432,7 +436,7 @@ class Session(object): return Session.verify_success(status) def insert_records( - self, device_ids, times, measurements_lst, types_lst, values_lst + self, device_ids, times, measurements_lst, types_lst, values_lst ): """ insert multiple rows of data, records are independent to each other, in other words, there's no relationship @@ -460,7 +464,7 @@ class Session(object): return Session.verify_success(status) def insert_aligned_record( - self, device_id, timestamp, measurements, data_types, values + self, device_id, timestamp, measurements, data_types, values ): """ insert one row of aligned record into database, if you want improve your performance, please use insertTablet method @@ -487,7 +491,7 @@ class Session(object): return Session.verify_success(status) def insert_aligned_records( - self, device_ids, times, measurements_lst, types_lst, values_lst + self, device_ids, times, measurements_lst, types_lst, values_lst ): """ insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship @@ -515,7 +519,7 @@ class Session(object): return Session.verify_success(status) def test_insert_record( - self, device_id, timestamp, measurements, data_types, values + self, device_id, timestamp, measurements, data_types, values ): """ this method NOT insert data into database and the server just return after accept the request, this method @@ -540,7 +544,7 @@ class Session(object): return Session.verify_success(status) def test_insert_records( - self, device_ids, times, measurements_lst, types_lst, values_lst + self, device_ids, times, measurements_lst, types_lst, values_lst ): """ this method NOT insert data into database and the server just return after accept the request, this method @@ -566,7 +570,7 @@ class Session(object): return Session.verify_success(status) def gen_insert_record_req( - self, device_id, timestamp, measurements, data_types, values, is_aligned=False + self, device_id, timestamp, measurements, data_types, values, is_aligned=False ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): raise RuntimeError( @@ -583,7 +587,7 @@ class Session(object): ) def gen_insert_str_record_req( - self, device_id, timestamp, measurements, data_types, values, is_aligned=False + self, device_id, timestamp, measurements, data_types, values, is_aligned=False ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): raise RuntimeError( @@ -594,19 +598,19 @@ class Session(object): ) def gen_insert_records_req( - self, - device_ids, - times, - measurements_lst, - types_lst, - values_lst, - is_aligned=False, + self, + device_ids, + times, + measurements_lst, + types_lst, + values_lst, + is_aligned=False, ): if ( - (len(device_ids) != len(measurements_lst)) - or (len(times) != len(types_lst)) - or (len(device_ids) != len(times)) - or (len(times) != len(values_lst)) + (len(device_ids) != len(measurements_lst)) + or (len(times) != len(types_lst)) + or (len(device_ids) != len(times)) + or (len(times) != len(values_lst)) ): raise RuntimeError( "deviceIds, times, measurementsList and valuesList's size should be equal" @@ -614,7 +618,7 @@ class Session(object): value_lst = [] for values, data_types, measurements in zip( - values_lst, types_lst, measurements_lst + values_lst, types_lst, measurements_lst ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): raise RuntimeError( @@ -697,7 +701,7 @@ class Session(object): return Session.verify_success(status) def insert_records_of_one_device( - self, device_id, times_list, measurements_list, types_list, values_list + self, device_id, times_list, measurements_list, types_list, values_list ): # sort by timestamp sorted_zipped = sorted( @@ -713,7 +717,7 @@ class Session(object): ) def insert_records_of_one_device_sorted( - self, device_id, times_list, measurements_list, types_list, values_list + self, device_id, times_list, measurements_list, types_list, values_list ): """ Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc @@ -730,9 +734,9 @@ class Session(object): # check parameter size = len(times_list) if ( - size != len(measurements_list) - or size != len(types_list) - or size != len(values_list) + size != len(measurements_list) + or size != len(types_list) + or size != len(values_list) ): raise RuntimeError( "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" @@ -755,7 +759,7 @@ class Session(object): return Session.verify_success(status) def insert_aligned_records_of_one_device( - self, device_id, times_list, measurements_list, types_list, values_list + self, device_id, times_list, measurements_list, types_list, values_list ): # sort by timestamp sorted_zipped = sorted( @@ -771,7 +775,7 @@ class Session(object): ) def insert_aligned_records_of_one_device_sorted( - self, device_id, times_list, measurements_list, types_list, values_list + self, device_id, times_list, measurements_list, types_list, values_list ): """ Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc @@ -787,9 +791,9 @@ class Session(object): # check parameter size = len(times_list) if ( - size != len(measurements_list) - or size != len(types_list) - or size != len(values_list) + size != len(measurements_list) + or size != len(types_list) + or size != len(values_list) ): raise RuntimeError( "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" @@ -812,17 +816,17 @@ class Session(object): return Session.verify_success(status) def gen_insert_records_of_one_device_request( - self, - device_id, - times_list, - measurements_list, - values_list, - types_list, - is_aligned=False, + self, + device_id, + times_list, + measurements_list, + values_list, + types_list, + is_aligned=False, ): binary_value_list = [] for values, data_types, measurements in zip( - values_list, types_list, measurements_list + values_list, types_list, measurements_list ): data_types = [data_type.value for data_type in data_types] if (len(values) != len(data_types)) or (len(values) != len(measurements)): @@ -1037,7 +1041,7 @@ class Session(object): return -1 def execute_raw_data_query( - self, paths: list, start_time: int, end_time: int + self, paths: list, start_time: int, end_time: int ) -> SessionDataSet: """ execute query statement and returns SessionDataSet @@ -1100,12 +1104,12 @@ class Session(object): ) def insert_string_records_of_one_device( - self, - device_id: str, - times: list, - measurements_list: list, - values_list: list, - have_sorted: bool = False, + self, + device_id: str, + times: list, + measurements_list: list, + values_list: list, + have_sorted: bool = False, ): """ insert multiple row of string record into database: @@ -1132,12 +1136,12 @@ class Session(object): return Session.verify_success(status) def insert_aligned_string_records_of_one_device( - self, - device_id: str, - times: list, - measurements_list: list, - values: list, - have_sorted: bool = False, + self, + device_id: str, + times: list, + measurements_list: list, + values: list, + have_sorted: bool = False, ): if (len(times) != len(measurements_list)) or (len(times) != len(values)): raise RuntimeError( @@ -1154,13 +1158,13 @@ class Session(object): return Session.verify_success(status) def gen_insert_string_records_of_one_device_request( - self, - device_id, - times, - measurements_list, - values_list, - have_sorted, - is_aligned=False, + self, + device_id, + times, + measurements_list, + values_list, + have_sorted, + is_aligned=False, ): if (len(times) != len(measurements_list)) or (len(times) != len(values_list)): raise RuntimeError( @@ -1244,13 +1248,13 @@ class Session(object): raise RuntimeError("execution of statement fails because: ", e) def add_measurements_in_template( - self, - template_name: str, - measurements_path: list, - data_types: list, - encodings: list, - compressors: list, - is_aligned: bool = False, + self, + template_name: str, + measurements_path: list, + data_types: list, + encodings: list, + compressors: list, + is_aligned: bool = False, ): """ add measurements in the template, the template must already create. This function adds some measurements node. @@ -1452,10 +1456,38 @@ class Session(object): ) return response.measurements - def fetch_window_batch(self, query_paths : list, function_name : str, fetch_args): + def fetch_window_batch(self, query_paths: list, function_name: str, fetch_args): request = TSFetchWindowBatchReq( - self.__session_id,query_paths,function_name - fetch_args. + self.__session_id, + self.__statement_id, + query_paths, + function_name, + TGroupByTimeParameter(fetch_args["start_time"], fetch_args["end_time"], fetch_args["interval"], + fetch_args["sliding_step"], + fetch_args["indexes"]) ) - response = self.__client.fetch_window_batch(request) - return \ No newline at end of file + try: + resp = self.__client.fetchWindowBatch(request) + status = resp.status + + if Session.verify_success(status) == 0: + window_batch = [] + for window_result_set in resp.windowBatchDataSetList: + window_session_data_set = SessionDataSet.init_from_window( + resp.columnNameList, + resp.columnTypeList, + resp.columnNameIndexMap, + self.__statement_id, + self.__session_id, + window_result_set + ) + + window_df = window_session_data_set.to_df(window_session_data_set) + window_batch.append(window_df) + return np.array(window_batch) + else: + raise RuntimeError( + "execution of fetch window batch fails because: {}", status.message + ) + except TTransport.TException as e: + raise RuntimeError("execution of fetch window batch fails because: ", e) diff --git a/client-py/iotdb/dbapi/Cursor.py b/client-py/iotdb/dbapi/Cursor.py index a1d6e2caab..a6a4e24e7d 100644 --- a/client-py/iotdb/dbapi/Cursor.py +++ b/client-py/iotdb/dbapi/Cursor.py @@ -136,7 +136,7 @@ class Cursor(object): rows = [] if data_set: - data = data_set.todf() + data = data_set.to_df() if self.__sqlalchemy_mode and time_index: time_column = data.columns[0] diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py index de0fe7728c..99417b0b80 100644 --- a/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -48,6 +48,7 @@ class IoTDBRpcDataSet(object): session_id, query_data_set, fetch_size, + is_rpc_fetch_result ): self.__statement_id = statement_id self.__session_id = session_id @@ -58,6 +59,7 @@ class IoTDBRpcDataSet(object): self.__fetch_size = fetch_size self.__column_size = len(column_name_list) self.__default_time_out = 1000 + self.__is_rpc_fetch_result = is_rpc_fetch_result self.__column_name_list = [] self.__column_type_list = [] @@ -137,7 +139,7 @@ class IoTDBRpcDataSet(object): return True if self.__empty_resultSet: return False - if self.fetch_results(): + if self.__is_rpc_fetch_result and self.fetch_results(): self.construct_one_row() return True return False @@ -152,14 +154,14 @@ class IoTDBRpcDataSet(object): return True if self.__empty_resultSet: return False - if self.fetch_results(): + if self.__is_rpc_fetch_result and self.fetch_results(): return True return False def _to_bitstring(self, b): return "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b)) - def resultset_to_pandas(self): + def resultset_to_numpy(self): result = {} for column_name in self.__column_name_list: result[column_name] = None @@ -278,9 +280,10 @@ class IoTDBRpcDataSet(object): for k, v in result.items(): if v is None: result[k] = [] + return result - df = pd.DataFrame(result) - return df + def resultset_to_pandas(self): + return pd.DataFrame(self.resultset_to_numpy()) def construct_one_row(self): # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. diff --git a/client-py/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py index 02eef027df..b9c08de453 100644 --- a/client-py/iotdb/utils/SessionDataSet.py +++ b/client-py/iotdb/utils/SessionDataSet.py @@ -32,17 +32,17 @@ logger = logging.getLogger("IoTDB") class SessionDataSet(object): def __init__( - self, - sql, - column_name_list, - column_type_list, - column_name_index, - query_id, - client, - statement_id, - session_id, - query_data_set, - ignore_timestamp, + self, + sql, + column_name_list, + column_type_list, + column_name_index, + query_id, + client, + statement_id, + session_id, + query_data_set, + ignore_timestamp, ): self.iotdb_rpc_data_set = IoTDBRpcDataSet( sql, @@ -56,7 +56,32 @@ class SessionDataSet(object): session_id, query_data_set, 1024, + True + ) + + @classmethod + def init_from_window(self, column_name_list, + column_type_list, + column_name_index, + statement_id, + session_id, + query_data_set + ): + self.iotdb_rpc_data_set = IoTDBRpcDataSet( + "", + column_name_list, + column_type_list, + column_name_index, + False, + -1, + None, + statement_id, + session_id, + query_data_set, + 1024, + False ) + return self def __enter__(self): return self @@ -96,8 +121,8 @@ class SessionDataSet(object): data_set_column_index -= 1 column_name = self.iotdb_rpc_data_set.get_column_names()[index] location = ( - self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - - IoTDBRpcDataSet.START_INDEX + self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] + - IoTDBRpcDataSet.START_INDEX ) if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index): @@ -136,9 +161,12 @@ class SessionDataSet(object): def close_operation_handle(self): self.iotdb_rpc_data_set.close() - def todf(self): + def to_df(self): return resultset_to_pandas(self) + def to_numpy(self): + return resultset_to_numpy(self) + def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame: """ @@ -150,6 +178,16 @@ def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame: return result_set.iotdb_rpc_data_set.resultset_to_pandas() +def resultset_to_numpy(result_set: SessionDataSet): + """ + Transforms a SessionDataSet from IoTDB to a Numpy array + Each Field from IoTDB is a column + :param result_set: + :return: + """ + return result_set.iotdb_rpc_data_set.resultset_to_numpy() + + def get_typed_point(field: Field, none_value=None): choices = { # In Case of Boolean, cast to 0 / 1 diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py index c7cce58ea5..0c59d6127f 100644 --- a/client-py/tests/test_dataframe.py +++ b/client-py/tests/test_dataframe.py @@ -34,7 +34,7 @@ def test_simple_query(): # Read session_data_set = session.execute_query_statement("SELECT ** FROM root") - df = session_data_set.todf() + df = session_data_set.to_df() session.close() @@ -54,7 +54,7 @@ def test_non_time_query(): # Read session_data_set = session.execute_query_statement("SHOW TIMESERIES") - df = session_data_set.todf() + df = session_data_set.to_df() session.close() diff --git a/client-py/tests/test_tablet.py b/client-py/tests/test_tablet.py index 1e80277d77..a3fb8dac6d 100644 --- a/client-py/tests/test_tablet.py +++ b/client-py/tests/test_tablet.py @@ -61,7 +61,7 @@ def test_tablet_insertion(): session_data_set = session.execute_query_statement( "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01" ) - df_output = session_data_set.todf() + df_output = session_data_set.to_df() df_output = df_output[df_input.columns.tolist()] session.close() @@ -104,7 +104,7 @@ def test_nullable_tablet_insertion(): session_data_set = session.execute_query_statement( "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01" ) - df_output = session_data_set.todf() + df_output = session_data_set.to_df() df_output = df_output[df_input.columns.tolist()] session.close() diff --git a/client-py/tests/test_todf.py b/client-py/tests/test_todf.py index 07953446cf..03fe2e76a5 100644 --- a/client-py/tests/test_todf.py +++ b/client-py/tests/test_todf.py @@ -94,7 +94,7 @@ def test_simple_query(): df_input.insert(0, "Time", timestamps) session_data_set = session.execute_query_statement("SELECT ** FROM root") - df_output = session_data_set.todf() + df_output = session_data_set.to_df() df_output = df_output[df_input.columns.tolist()] session.close() @@ -174,7 +174,7 @@ def test_with_null_query(): df_input.insert(0, "Time", timestamps) session_data_set = session.execute_query_statement("SELECT ** FROM root") - df_output = session_data_set.todf() + df_output = session_data_set.to_df() df_output = df_output[df_input.columns.tolist()] session.close() @@ -212,7 +212,7 @@ def test_multi_fetch(): session_data_set = session.execute_query_statement("SELECT ** FROM root") session_data_set.set_fetch_size(100) - df_output = session_data_set.todf() + df_output = session_data_set.to_df() df_output = df_output[df_input.columns.tolist()] session.close()
