This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch try_fix_python
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/try_fix_python by this push:
new cf4aba84 try win
cf4aba84 is described below
commit cf4aba843d8be203af006aab0bad03b7baa8c1eb
Author: HTHou <[email protected]>
AuthorDate: Mon Aug 5 00:16:52 2024 +0800
try win
---
python/setup.py | 10 +++-
python/tsfile/tsfile.py | 1 +
python/tsfile/tsfile_pywrapper.pyx | 95 +++++++++++++++++++-------------------
3 files changed, 56 insertions(+), 50 deletions(-)
diff --git a/python/setup.py b/python/setup.py
index bd438823..82aabb74 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -83,8 +83,14 @@ ext_modules_tsfile = [
libraries=["tsfile"],
library_dirs=[libtsfile_dir],
include_dirs=[include_dir, np.get_include()],
- runtime_library_dirs=[libtsfile_dir] if platform.system() != "Windows"
else None,
- extra_compile_args=["-std=c++11"] if platform.system() != "Windows"
else ["-std=c++11", "-DMS_WIN64"],
+ runtime_library_dirs=(
+ [libtsfile_dir] if platform.system() != "Windows" else None
+ ),
+ extra_compile_args=(
+ ["-std=c++11"]
+ if platform.system() != "Windows"
+ else ["-std=c++11", "-DMS_WIN64"]
+ ),
language="c++",
)
]
diff --git a/python/tsfile/tsfile.py b/python/tsfile/tsfile.py
index 3e92f052..425a429b 100644
--- a/python/tsfile/tsfile.py
+++ b/python/tsfile/tsfile.py
@@ -27,6 +27,7 @@ from pandas import DataFrame
TIMESTAMP_STR = "Time"
+
class EmptyFileError(Exception):
def __init__(self, message="File is empty"):
self.message = message
diff --git a/python/tsfile/tsfile_pywrapper.pyx
b/python/tsfile/tsfile_pywrapper.pyx
index 3a28f3f6..4b8a8e62 100644
--- a/python/tsfile/tsfile_pywrapper.pyx
+++ b/python/tsfile/tsfile_pywrapper.pyx
@@ -23,8 +23,7 @@ import pandas as pd
from cpython.bytes cimport PyBytes_AsString
cimport numpy as cnp
import numpy as np
-from .tsfile cimport *
-import gc
+cimport tsfile as tsf
TIMESTAMP_STR = "Time"
TS_TYPE_INT32 = 1 << 8
@@ -44,14 +43,16 @@ type_mapping = {
cdef class tsfile_reader:
- cdef CTsFileReader reader
- cdef QueryDataRet ret
+ cdef tsf.CTsFileReader reader
+ cdef tsf.QueryDataRet ret
cdef int batch_size
cdef bint read_all_at_once
def __init__(self, pathname, table_name, columns, start_time=None,
end_time=None, batch_size=None):
+ self.ret = None
+ self.reader = None
self.open_reader(pathname)
self.query_data_ret(table_name, columns, start_time, end_time)
@@ -63,10 +64,10 @@ cdef class tsfile_reader:
self.read_all_at_once = True
cdef open_reader(self, pathname):
- cdef ErrorCode err_code
+ cdef tsf.ErrorCode err_code
err_code = 0
- self.reader = ts_reader_open(pathname.encode('utf-8'), &err_code)
- if (err_code != 0):
+ self.reader = tsf.ts_reader_open(pathname.encode('utf-8'), &err_code)
+ if err_code != 0:
raise Exception("Failed to open tsfile: %s, %s" %( pathname,
err_code))
cdef query_data_ret(self, table_name, columns, start_time = None,
end_time=None):
@@ -95,12 +96,12 @@ cdef class tsfile_reader:
# query data from tsfile
if start_time is not None or end_time is not None:
if start_time is None:
- start_time = LLONG_MIN
+ start_time = tsf.LLONG_MIN
if end_time is None:
- end_time = LLONG_MAX
- self.ret = ts_reader_begin_end(self.reader, c_table_name,
c_columns, len(columns), start_time, end_time)
+ end_time = tsf.LLONG_MAX
+ self.ret = tsf.ts_reader_begin_end(self.reader, c_table_name,
c_columns, len(columns), start_time, end_time)
else:
- self.ret = ts_reader_read(self.reader, table_name.encode('utf-8'),
c_columns, len(columns))
+ self.ret = tsf.ts_reader_read(self.reader,
table_name.encode('utf-8'), c_columns, len(columns))
for i in range(len(columns)):
free(c_columns[i])
@@ -117,8 +118,6 @@ cdef class tsfile_reader:
if chunk is not None:
res = pd.concat([res, chunk])
not_null_maps.append(not_null_map)
- del chunk
- gc.collect()
else:
break
else:
@@ -127,10 +126,8 @@ cdef class tsfile_reader:
self.free_resources()
not_null_map_all = None
- if (not_null_maps != []):
+ if not_null_maps:
not_null_map_all = np.vstack(not_null_maps)
- del not_null_maps
- gc.collect()
return res, not_null_map_all
def __iter__(self):
@@ -144,8 +141,8 @@ cdef class tsfile_reader:
def get_next_dataframe(self):
cdef:
- DataResult* result
- ColumnSchema* schema = NULL
+ tsf.DataResult* result
+ tsf.ColumnSchema* schema = NULL
cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] np_array_i64
cnp.ndarray[cnp.int32_t, ndim=1, mode='c'] np_array_i32
cnp.ndarray[cnp.float32_t, ndim=1, mode='c'] np_array_float
@@ -165,7 +162,7 @@ cdef class tsfile_reader:
for i in range(self.ret.column_num):
pystr = self.ret.column_names[i]
- py_string = pystr.decode('utf-8', 'ignore')
+ py_string = pystr.decode(encoding='utf-8', errors='ignore')
column_order.append(py_string)
res[py_string] = []
@@ -174,12 +171,12 @@ cdef class tsfile_reader:
if self.ret.data == NULL:
return None, None
- result = ts_next(self.ret, self.batch_size)
+ result = tsf.ts_next(self.ret, self.batch_size)
# there is no data meet our requirement
if result.column_schema == NULL:
# free memory
- if (destory_tablet(result) != 0):
+ if tsf.destory_tablet(result) != 0:
raise Exception("Failed to destroy tablet")
return None, None
@@ -194,7 +191,7 @@ cdef class tsfile_reader:
# column name
schema = result.column_schema[i]
pystr = schema.name
- column_name = pystr.decode('utf-8')
+ column_name = pystr.decode(encoding='utf-8')
# column bitmap
is_not_null = np.empty(length, dtype = bool)
@@ -249,7 +246,7 @@ cdef class tsfile_reader:
res[column_name] = arr
not_null_map.append(is_not_null)
- if (destory_tablet(result) != 0):
+ if tsf.destory_tablet(result) != 0:
raise Exception("Failed to destroy tablet")
return pd.DataFrame(res, columns = column_order), not_null_map
@@ -264,54 +261,56 @@ cdef class tsfile_reader:
cdef free_resources(self):
if self.reader:
- if ts_reader_close(self.reader) != 0 :
+ if tsf.ts_reader_close(self.reader) != 0 :
raise Exception("Failed to close tsfile")
if self.ret:
- if destory_query_dataret(self.ret) != 0:
+ if tsf.destory_query_dataret(self.ret) != 0:
raise Exception("Failed to free query data ret")
self.reader = NULL
self.ret = NULL
cdef class tsfile_writer:
- cdef CTsFileWriter writer
- cdef TsFileRowData row_data
+ cdef tsf.CTsFileWriter writer
+ cdef tsf.TsFileRowData row_data
def __init__(self, pathname):
+ self.row_data = None
+ self.writer = None
self.open_writer(pathname)
cdef open_writer(self, pathname):
- cdef ErrorCode err_code
+ cdef tsf.ErrorCode err_code
err_code = 0
- self.writer = ts_writer_open(pathname.encode('utf-8'), &err_code)
- if (err_code != 0):
+ self.writer = tsf.ts_writer_open(pathname.encode('utf-8'), &err_code)
+ if err_code != 0:
raise Exception("Failed to open tsfile: %s, %s" %( pathname,
err_code))
def resister_timeseries(self, table_name, column_name, data_type):
cdef char* c_columns
cdef bytes py_table_name
- cdef ColumnSchema schema
+ cdef tsf.ColumnSchema schema
cdef bytes encoded_column_name = column_name.encode('utf-8')
py_table_name = table_name.encode('utf-8')
c_table_name = PyBytes_AsString(py_table_name)
schema.name = encoded_column_name
schema.column_def = data_type
- if tsfile_register_table_column(self.writer, c_table_name, &schema) !=
0:
+ if tsf.tsfile_register_table_column(self.writer, c_table_name,
&schema) != 0:
raise Exception("Failed to register timeseries")
cdef create_row_data(self, table_name, time, column_length):
- self.row_data = create_tsfile_row(table_name.encode('utf-8'), time,
column_length)
- def write_into_row_data(self, column_name, value, type):
+ self.row_data = tsf.create_tsfile_row(table_name.encode('utf-8'),
time, column_length)
+ def write_into_row_data(self, column_name, value, data_type):
cdef char* c_column_name =
PyBytes_AsString(column_name.encode('utf-8'))
- if type == TS_TYPE_INT32:
- insert_data_into_tsfile_row_int32(self.row_data, c_column_name,
value)
- elif type == TS_TYPE_BOOLEAN:
- insert_data_into_tsfile_row_boolean(self.row_data, c_column_name,
value)
- elif type == TS_TYPE_FLOAT:
- insert_data_into_tsfile_row_float(self.row_data, c_column_name,
value)
- elif type == TS_TYPE_DOUBLE:
- insert_data_into_tsfile_row_double(self.row_data, c_column_name,
value)
- elif type == TS_TYPE_INT64:
- insert_data_into_tsfile_row_int64(self.row_data, c_column_name,
value)
+ if data_type == TS_TYPE_INT32:
+ tsf.insert_data_into_tsfile_row_int32(self.row_data,
c_column_name, value)
+ elif data_type == TS_TYPE_BOOLEAN:
+ tsf.insert_data_into_tsfile_row_boolean(self.row_data,
c_column_name, value)
+ elif data_type == TS_TYPE_FLOAT:
+ tsf.insert_data_into_tsfile_row_float(self.row_data,
c_column_name, value)
+ elif data_type == TS_TYPE_DOUBLE:
+ tsf.insert_data_into_tsfile_row_double(self.row_data,
c_column_name, value)
+ elif data_type == TS_TYPE_INT64:
+ tsf.insert_data_into_tsfile_row_int64(self.row_data,
c_column_name, value)
else:
raise TypeError("Unknown column type")
def write_tsfile(self, table_name, df):
@@ -325,7 +324,7 @@ cdef class tsfile_writer:
else:
raise TypeError("Unknown column type")
- if (column_names[i] != TIMESTAMP_STR):
+ if column_names[i] != TIMESTAMP_STR:
self.resister_timeseries(table_name, column_names[i],
column_ctypes[i])
@@ -337,16 +336,16 @@ cdef class tsfile_writer:
column_value = df.iloc[i][column_name]
column_ctype = column_ctypes[j]
self.write_into_row_data(column_name, column_value,
column_ctype)
- if tsfile_write_row_data(self.writer, self.row_data) != 0:
+ if tsf.tsfile_write_row_data(self.writer, self.row_data) != 0:
raise Exception("Failed to write row data")
- if tsfile_flush_data(self.writer) != 0:
+ if tsf.tsfile_flush_data(self.writer) != 0:
raise Exception("Failed to flush data")
self.row_data = NULL
self.free_resources()
def free_resources(self):
if self.writer != NULL:
- if ts_writer_close(self.writer) != 0:
+ if tsf.ts_writer_close(self.writer) != 0:
raise Exception("Failed to close tsfile")
self.writer = NULL
def __dealloc__(self):