This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch mlnode/test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3fe1b36a1acc20c0c7cea8b9774f1fc9147715a7 Merge: 3dcb848c81 0b8adb0159 Author: Minghui Liu <[email protected]> AuthorDate: Tue Apr 4 17:11:31 2023 +0800 Merge remote-tracking branch 'liuyong/mlnode/test' into mlnode/test # Conflicts: # mlnode/iotdb/mlnode/client.py mlnode/iotdb/mlnode/algorithm/enums.py | 3 ++- mlnode/iotdb/mlnode/algorithm/factory.py | 4 +++- mlnode/iotdb/mlnode/algorithm/metric.py | 4 ++-- mlnode/iotdb/mlnode/client.py | 16 +++++++++------- mlnode/iotdb/mlnode/config.py | 4 ++-- mlnode/iotdb/mlnode/data_access/factory.py | 4 ++-- mlnode/iotdb/mlnode/data_access/offline/dataset.py | 14 ++++++++------ mlnode/iotdb/mlnode/data_access/offline/source.py | 9 +++++---- mlnode/iotdb/mlnode/parser.py | 10 ++++++---- mlnode/iotdb/mlnode/storage.py | 6 +++--- 10 files changed, 42 insertions(+), 32 deletions(-) diff --cc mlnode/iotdb/mlnode/client.py index bf46846fe1,14b58e9d6e..d39156e0fd --- a/mlnode/iotdb/mlnode/client.py +++ b/mlnode/iotdb/mlnode/client.py @@@ -16,8 -16,8 +16,10 @@@ # under the License. # import time + from typing import List, Dict + +import pandas as pd +from iotdb.mlnode import serde from thrift.protocol import TBinaryProtocol, TCompactProtocol from thrift.Thrift import TException from thrift.transport import TSocket, TTransport @@@ -124,10 -124,10 +126,10 @@@ class DataNodeClient(object) self.__client = IMLNodeInternalRPCService.Client(protocol) def fetch_timeseries(self, - query_expressions: list, + query_expressions: List[str], query_filter: str = None, fetch_size: int = DEFAULT_FETCH_SIZE, - timeout: int = DEFAULT_TIMEOUT) -> TFetchTimeseriesResp: + timeout: int = DEFAULT_TIMEOUT) -> [int, bool, pd.DataFrame]: req = TFetchTimeseriesReq( queryExpressions=query_expressions, queryFilter=query_filter, @@@ -137,30 -137,10 +139,30 @@@ try: resp = self.__client.fetchTimeseries(req) verify_success(resp.status, "An error occurs when calling fetch_timeseries()") - return resp - except TTransport.TException as e: + + if len(resp.tsDataset) == 0: + raise RuntimeError(f'No data fetched with query filter: {query_filter}') + + data = serde.convert_to_df(resp.columnNameList, + resp.columnTypeList, + resp.columnNameIndexMap, + resp.tsDataset) + if data.empty: + raise RuntimeError( + f'Fetched empty data with query expressions: {query_expressions} and query filter: {query_filter}') + return resp.queryId, resp.hasMoreData, data + except Exception as e: + logger.warn( + f'Fail to fetch data with query expressions: {query_expressions} and query filter: {query_filter}') raise e + def fetch_window_batch(self, + query_expressions: list, + query_filter: str = None, + fetch_size: int = DEFAULT_FETCH_SIZE, - timeout: int = DEFAULT_TIMEOUT) -> [int, bool, list[pd.DataFrame]]: ++ timeout: int = DEFAULT_TIMEOUT) -> [int, bool, List[pd.DataFrame]]: + pass + def record_model_metrics(self, model_id: str, trial_id: str,
