This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_dataframe_to_tsfile in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit b2de41c475415333a0502714c6813a1e2f9dc6de Author: ColinLee <[email protected]> AuthorDate: Wed Jan 14 22:20:51 2026 +0800 fix. --- python/tsfile/tsfile_py_cpp.pyx | 19 +++++++++------ python/tsfile/tsfile_table_writer.py | 10 ++------ python/tsfile/utils.py | 46 +++++++++++++----------------------- 3 files changed, 30 insertions(+), 45 deletions(-) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 84c550f9..ad0ecdc3 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -310,6 +310,17 @@ cdef Tablet to_c_tablet(object tablet): cdef TSDataType pandas_dtype_to_ts_data_type(object dtype): return to_c_data_type(TSDataTypePy.from_pandas_datatype(dtype)) +cdef TSDataType check_string_or_blob(TSDataType ts_data_type, object dtype, object column_series): + if ts_data_type == TS_DATATYPE_STRING: + dtype_str = str(dtype) + if dtype == 'object' or dtype_str == "<class 'numpy.object_'>": + first_valid_idx = column_series.first_valid_index() + if first_valid_idx is not None: + first_value = column_series[first_valid_idx] + if isinstance(first_value, bytes): + return TS_DATATYPE_BLOB + return ts_data_type + cdef Tablet dataframe_to_c_tablet(object target_name, object dataframe): cdef Tablet ctablet cdef int max_row_num @@ -356,13 +367,7 @@ cdef Tablet dataframe_to_c_tablet(object target_name, object dataframe): for col_name in data_columns: pandas_dtype = dataframe[col_name].dtype ds_type = pandas_dtype_to_ts_data_type(pandas_dtype) - if ds_type == TS_DATATYPE_STRING: - column_series = dataframe[col_name] - first_valid_idx = column_series.first_valid_index() - if first_valid_idx is not None: - first_value = column_series[first_valid_idx] - if isinstance(first_value, bytes): - ds_type = TS_DATATYPE_BLOB + ds_type = check_string_or_blob(ds_type, pandas_dtype, dataframe[col_name]) column_types_list.append(ds_type) columns_names = <char**> malloc(sizeof(char *) * column_num) diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index 9a8498ee..e4867c21 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -21,6 +21,7 @@ from tsfile import TableSchema, Tablet, TableNotExistError from tsfile import TsFileWriter from tsfile.constants import TSDataType from tsfile.exceptions import ColumnNotExistError, TypeMismatchError +from tsfile.utils import check_string_or_blob class TsFileTableWriter: @@ -94,14 +95,7 @@ class TsFileTableWriter: df_dtype = dataframe[df_col_name_original].dtype df_ts_type = TSDataType.from_pandas_datatype(df_dtype) - - if df_ts_type == TSDataType.STRING and (df_dtype == 'object' or str(df_dtype) == "<class 'numpy.object_'>"): - column_series = dataframe[df_col_name_original] - first_valid_idx = column_series.first_valid_index() - if first_valid_idx is not None: - first_value = column_series[first_valid_idx] - if isinstance(first_value, bytes): - df_ts_type = TSDataType.BLOB + df_ts_type = check_string_or_blob(df_ts_type, df_dtype, dataframe[df_col_name_original]) schema_col = schema_column_map[col_name] expected_ts_type = schema_col.get_data_type() diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index a19b7e0e..d2c19428 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -27,6 +27,16 @@ from tsfile.tsfile_reader import TsFileReaderPy from tsfile import ColumnSchema, TableSchema, ColumnCategory, TSDataType, TsFileTableWriter +def check_string_or_blob(ts_data_type: TSDataType, dtype, column_series: pd.Series) -> TSDataType: + if ts_data_type == TSDataType.STRING and (dtype == 'object' or str(dtype) == "<class 'numpy.object_'>"): + first_valid_idx = column_series.first_valid_index() + if first_valid_idx is not None: + first_value = column_series[first_valid_idx] + if isinstance(first_value, bytes): + return TSDataType.BLOB + return ts_data_type + + def to_dataframe(file_path: str, table_name: Optional[str] = None, column_names: Optional[list[str]] = None, @@ -163,7 +173,7 @@ def to_dataframe(file_path: str, return pd.DataFrame() -def to_tsfile(dataframe: pd.DataFrame, +def dataframe_to_tsfile(dataframe: pd.DataFrame, file_path: str, table_name: Optional[str] = None, time_column: Optional[str] = None, @@ -209,38 +219,30 @@ def to_tsfile(dataframe: pd.DataFrame, if dataframe is None or dataframe.empty: raise ValueError("DataFrame cannot be None or empty") - # Determine table name if table_name is None: table_name = "table" - # Determine time column time_col_name = None if time_column is not None: - # Check if specified time column exists if time_column not in dataframe.columns: raise ValueError(f"Time column '{time_column}' not found in DataFrame") - # Check if time column is integer type (int64 or int) if not is_integer_dtype(dataframe[time_column].dtype): raise TypeError(f"Time column '{time_column}' must be integer type (int64 or int), got {dataframe[time_column].dtype}") time_col_name = time_column else: - # Look for 'time' column (case-insensitive) for col in dataframe.columns: if col.lower() == 'time': - # Check if time column is integer type if is_integer_dtype(dataframe[col].dtype): time_col_name = col break else: raise TypeError(f"Time column '{col}' must be integer type (int64 or int), got {dataframe[col].dtype}") - # Get data columns (excluding time column) data_columns = [col for col in dataframe.columns if col != time_col_name] if len(data_columns) == 0: raise ValueError("DataFrame must have at least one data column besides the time column") - # Normalize tag_column list (convert to lowercase for comparison) tag_columns_lower = [] if tag_column is not None: for tag_col in tag_column: @@ -248,23 +250,12 @@ def to_tsfile(dataframe: pd.DataFrame, raise ValueError(f"Tag column '{tag_col}' not found in DataFrame") tag_columns_lower.append(tag_col.lower()) - # Infer schema from DataFrame columns column_schemas = [] for col_name in data_columns: - # Infer data type from DataFrame column col_dtype = dataframe[col_name].dtype ts_data_type = TSDataType.from_pandas_datatype(col_dtype) - - # If inferred type is STRING but dtype is object, check actual data to distinguish STRING vs BLOB - if ts_data_type == TSDataType.STRING and (col_dtype == 'object' or str(col_dtype) == "<class 'numpy.object_'>"): - column_series = dataframe[col_name] - first_valid_idx = column_series.first_valid_index() - if first_valid_idx is not None: - first_value = column_series[first_valid_idx] - if isinstance(first_value, bytes): - ts_data_type = TSDataType.BLOB - - # Determine category (TAG or FIELD) + ts_data_type = check_string_or_blob(ts_data_type, col_dtype, dataframe[col_name]) + if col_name.lower() in tag_columns_lower: category = ColumnCategory.TAG else: @@ -272,17 +263,12 @@ def to_tsfile(dataframe: pd.DataFrame, column_schemas.append(ColumnSchema(col_name, ts_data_type, category)) - # Create table schema table_schema = TableSchema(table_name, column_schemas) - # Prepare DataFrame for writing - # If time column is specified or found, rename it to 'time' if needed - # If no time column is found, write_dataframe will handle it by using index - df_to_write = dataframe.copy() if time_col_name is not None and time_col_name != 'time': - # Rename time column to 'time' for consistency - df_to_write = df_to_write.rename(columns={time_col_name: 'time'}) + df_to_write = dataframe.rename(columns={time_col_name: 'time'}) + else: + df_to_write = dataframe - # Write DataFrame to TsFile with TsFileTableWriter(file_path, table_schema) as writer: writer.write_dataframe(df_to_write) \ No newline at end of file
