This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch mlnode/test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3fe1b36a1acc20c0c7cea8b9774f1fc9147715a7
Merge: 3dcb848c81 0b8adb0159
Author: Minghui Liu <[email protected]>
AuthorDate: Tue Apr 4 17:11:31 2023 +0800

    Merge remote-tracking branch 'liuyong/mlnode/test' into mlnode/test
    
    # Conflicts:
    #       mlnode/iotdb/mlnode/client.py

 mlnode/iotdb/mlnode/algorithm/enums.py             |  3 ++-
 mlnode/iotdb/mlnode/algorithm/factory.py           |  4 +++-
 mlnode/iotdb/mlnode/algorithm/metric.py            |  4 ++--
 mlnode/iotdb/mlnode/client.py                      | 16 +++++++++-------
 mlnode/iotdb/mlnode/config.py                      |  4 ++--
 mlnode/iotdb/mlnode/data_access/factory.py         |  4 ++--
 mlnode/iotdb/mlnode/data_access/offline/dataset.py | 14 ++++++++------
 mlnode/iotdb/mlnode/data_access/offline/source.py  |  9 +++++----
 mlnode/iotdb/mlnode/parser.py                      | 10 ++++++----
 mlnode/iotdb/mlnode/storage.py                     |  6 +++---
 10 files changed, 42 insertions(+), 32 deletions(-)

diff --cc mlnode/iotdb/mlnode/client.py
index bf46846fe1,14b58e9d6e..d39156e0fd
--- a/mlnode/iotdb/mlnode/client.py
+++ b/mlnode/iotdb/mlnode/client.py
@@@ -16,8 -16,8 +16,10 @@@
  # under the License.
  #
  import time
+ from typing import List, Dict
+ 
 +import pandas as pd
 +from iotdb.mlnode import serde
  from thrift.protocol import TBinaryProtocol, TCompactProtocol
  from thrift.Thrift import TException
  from thrift.transport import TSocket, TTransport
@@@ -124,10 -124,10 +126,10 @@@ class DataNodeClient(object)
          self.__client = IMLNodeInternalRPCService.Client(protocol)
  
      def fetch_timeseries(self,
-                          query_expressions: list,
+                          query_expressions: List[str],
                           query_filter: str = None,
                           fetch_size: int = DEFAULT_FETCH_SIZE,
 -                         timeout: int = DEFAULT_TIMEOUT) -> 
TFetchTimeseriesResp:
 +                         timeout: int = DEFAULT_TIMEOUT) -> [int, bool, 
pd.DataFrame]:
          req = TFetchTimeseriesReq(
              queryExpressions=query_expressions,
              queryFilter=query_filter,
@@@ -137,30 -137,10 +139,30 @@@
          try:
              resp = self.__client.fetchTimeseries(req)
              verify_success(resp.status, "An error occurs when calling 
fetch_timeseries()")
 -            return resp
 -        except TTransport.TException as e:
 +
 +            if len(resp.tsDataset) == 0:
 +                raise RuntimeError(f'No data fetched with query filter: 
{query_filter}')
 +
 +            data = serde.convert_to_df(resp.columnNameList,
 +                                       resp.columnTypeList,
 +                                       resp.columnNameIndexMap,
 +                                       resp.tsDataset)
 +            if data.empty:
 +                raise RuntimeError(
 +                    f'Fetched empty data with query expressions: 
{query_expressions} and query filter: {query_filter}')
 +            return resp.queryId, resp.hasMoreData, data
 +        except Exception as e:
 +            logger.warn(
 +                f'Fail to fetch data with query expressions: 
{query_expressions} and query filter: {query_filter}')
              raise e
  
 +    def fetch_window_batch(self,
 +                           query_expressions: list,
 +                           query_filter: str = None,
 +                           fetch_size: int = DEFAULT_FETCH_SIZE,
-                            timeout: int = DEFAULT_TIMEOUT) -> [int, bool, 
list[pd.DataFrame]]:
++                           timeout: int = DEFAULT_TIMEOUT) -> [int, bool, 
List[pd.DataFrame]]:
 +        pass
 +
      def record_model_metrics(self,
                               model_id: str,
                               trial_id: str,

Reply via email to