This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f7d7f68aff7 [AINode] Sync codes for ainode (#17139)
f7d7f68aff7 is described below
commit f7d7f68aff7fc64672142a3275b4259c231df154
Author: Leo <[email protected]>
AuthorDate: Fri Feb 6 08:57:59 2026 +0800
[AINode] Sync codes for ainode (#17139)
Co-authored-by: Liu Zhengyun <[email protected]>
---
iotdb-core/ainode/iotdb/ainode/core/config.py | 15 ---
iotdb-core/ainode/iotdb/ainode/core/constant.py | 1 -
.../core/inference/inference_request_pool.py | 4 +-
.../core/inference/pipeline/basic_pipeline.py | 74 +++++++++++--
.../ainode/iotdb/ainode/core/ingress/iotdb.py | 8 --
.../iotdb/ainode/core/manager/inference_manager.py | 116 ++++++++++++---------
.../ainode/resources/conf/iotdb-ainode.properties | 4 -
.../thrift-ainode/src/main/thrift/ainode.thrift | 3 +-
8 files changed, 135 insertions(+), 90 deletions(-)
diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py
b/iotdb-core/ainode/iotdb/ainode/core/config.py
index b14efa3bedf..4995dda7bf3 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/config.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/config.py
@@ -23,7 +23,6 @@ from iotdb.ainode.core.constant import (
AINODE_CLUSTER_INGRESS_ADDRESS,
AINODE_CLUSTER_INGRESS_PASSWORD,
AINODE_CLUSTER_INGRESS_PORT,
- AINODE_CLUSTER_INGRESS_TIME_ZONE,
AINODE_CLUSTER_INGRESS_USERNAME,
AINODE_CLUSTER_NAME,
AINODE_CONF_DIRECTORY_NAME,
@@ -69,7 +68,6 @@ class AINodeConfig(object):
self._ain_cluster_ingress_port = AINODE_CLUSTER_INGRESS_PORT
self._ain_cluster_ingress_username = AINODE_CLUSTER_INGRESS_USERNAME
self._ain_cluster_ingress_password = AINODE_CLUSTER_INGRESS_PASSWORD
- self._ain_cluster_ingress_time_zone = AINODE_CLUSTER_INGRESS_TIME_ZONE
# Inference configuration
self._ain_inference_batch_interval_in_ms: int = (
@@ -287,14 +285,6 @@ class AINodeConfig(object):
) -> None:
self._ain_cluster_ingress_password = ain_cluster_ingress_password
- def get_ain_cluster_ingress_time_zone(self) -> str:
- return self._ain_cluster_ingress_time_zone
-
- def set_ain_cluster_ingress_time_zone(
- self, ain_cluster_ingress_time_zone: str
- ) -> None:
- self._ain_cluster_ingress_time_zone = ain_cluster_ingress_time_zone
-
@singleton
class AINodeDescriptor(object):
@@ -432,11 +422,6 @@ class AINodeDescriptor(object):
file_configs["ain_cluster_ingress_password"]
)
- if "ain_cluster_ingress_time_zone" in config_keys:
- self._config.set_ain_cluster_ingress_time_zone(
- file_configs["ain_cluster_ingress_time_zone"]
- )
-
except BadNodeUrlException:
logger.warning("Cannot load AINode conf file, use default
configuration.")
diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py
b/iotdb-core/ainode/iotdb/ainode/core/constant.py
index b0019722630..4a2ee543d1f 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/constant.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py
@@ -39,7 +39,6 @@ AINODE_CLUSTER_INGRESS_ADDRESS = "127.0.0.1"
AINODE_CLUSTER_INGRESS_PORT = 6667
AINODE_CLUSTER_INGRESS_USERNAME = "root"
AINODE_CLUSTER_INGRESS_PASSWORD = "root"
-AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8"
# RPC config
AINODE_THRIFT_COMPRESSION_ENABLED = False
diff --git
a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
index dcfa4528fce..8121d4fecd8 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
@@ -127,7 +127,9 @@ class InferenceRequestPool(mp.Process):
for i in range(batch_inputs.size(0)):
batch_input_list.append({"targets": batch_inputs[i]})
batch_inputs = self._inference_pipeline.preprocess(
- batch_input_list, output_length=requests[0].output_length
+ batch_input_list,
+ output_length=requests[0].output_length,
+ auto_adapt=True,
)
if isinstance(self._inference_pipeline, ForecastPipeline):
batch_output = self._inference_pipeline.forecast(
diff --git
a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
index ece395bf697..5d0026522a1 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
@@ -19,13 +19,16 @@
from abc import ABC, abstractmethod
import torch
+from torch.nn import functional as F
from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.log import Logger
from iotdb.ainode.core.manager.device_manager import DeviceManager
from iotdb.ainode.core.model.model_info import ModelInfo
from iotdb.ainode.core.model.model_loader import load_model
BACKEND = DeviceManager()
+logger = Logger()
class BasicPipeline(ABC):
@@ -70,6 +73,7 @@ class ForecastPipeline(BasicPipeline):
infer_kwargs (dict, optional): Additional keyword arguments for
inference, such as:
- `output_length`(int): Used to check validation of
'future_covariates' if provided.
+ - `auto_adapt`(bool): Whether to automatically adapt the
covariates.
Raises:
ValueError: If the input format is incorrect (e.g., missing keys,
invalid tensor shapes).
@@ -80,6 +84,7 @@ class ForecastPipeline(BasicPipeline):
if isinstance(inputs, list):
output_length = infer_kwargs.get("output_length", 96)
+ auto_adapt = infer_kwargs.get("auto_adapt", True)
for idx, input_dict in enumerate(inputs):
# Check if the dictionary contains the expected keys
if not isinstance(input_dict, dict):
@@ -121,10 +126,30 @@ class ForecastPipeline(BasicPipeline):
raise ValueError(
f"Each value in 'past_covariates' must be
torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
- if cov_value.ndim != 1 or cov_value.shape[0] !=
input_length:
+ if cov_value.ndim != 1:
raise ValueError(
- f"Each covariate in 'past_covariates' must have
shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at
index {idx}."
+ f"Individual `past_covariates` must be 1-d, found:
{cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
)
+ # If any past_covariate's length is not equal to
input_length, process it accordingly.
+ if cov_value.shape[0] != input_length:
+ if auto_adapt:
+ if cov_value.shape[0] > input_length:
+ logger.warning(
+ f"Past covariate {cov_key} at index {idx}
has length {cov_value.shape[0]} (> {input_length}), which will be truncated
from the beginning."
+ )
+ past_covariates[cov_key] =
cov_value[-input_length:]
+ else:
+ logger.warning(
+ f"Past covariate {cov_key} at index {idx}
has length {cov_value.shape[0]} (< {input_length}), which will be padded with
zeros at the beginning."
+ )
+ pad_size = input_length - cov_value.shape[0]
+ past_covariates[cov_key] = F.pad(
+ cov_value, (pad_size, 0)
+ )
+ else:
+ raise ValueError(
+ f"Individual `past_covariates` must be 1-d
with length equal to the length of `target` (= {input_length}), found:
{cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}."
+ )
# Check 'future_covariates' if it exists (optional)
future_covariates = input_dict.get("future_covariates", {})
@@ -134,19 +159,52 @@ class ForecastPipeline(BasicPipeline):
)
# If future_covariates exists, check if they are a subset of
past_covariates
if future_covariates:
- for cov_key, cov_value in future_covariates.items():
+ for cov_key, cov_value in list(future_covariates.items()):
+ # If any future_covariate not found in
past_covariates, ignore it or raise an error.
if cov_key not in past_covariates:
- raise ValueError(
- f"Key '{cov_key}' in 'future_covariates' is
not in 'past_covariates' at index {idx}."
- )
+ if auto_adapt:
+ future_covariates.pop(cov_key)
+ logger.warning(
+ f"Future covariate {cov_key} not found in
past_covariates {list(past_covariates.keys())}, which will be ignored when
executing forecasting."
+ )
+ if not future_covariates:
+ input_dict.pop("future_covariates")
+ continue
+ else:
+ raise ValueError(
+ f"Expected keys in `future_covariates` to
be a subset of `past_covariates` {list(past_covariates.keys())}, "
+ f"but found {cov_key} in element at index
{idx}."
+ )
if not isinstance(cov_value, torch.Tensor):
raise ValueError(
f"Each value in 'future_covariates' must be
torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
- if cov_value.ndim != 1 or cov_value.shape[0] !=
output_length:
+ if cov_value.ndim != 1:
raise ValueError(
- f"Each covariate in 'future_covariates' must
have shape ({output_length},), but got shape {cov_value.shape} for key
'{cov_key}' at index {idx}."
+ f"Individual `future_covariates` must be 1-d,
found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
)
+ # If any future_covariate's length is not equal to
output_length, process it accordingly.
+ if cov_value.shape[0] != output_length:
+ if auto_adapt:
+ if cov_value.shape[0] > output_length:
+ logger.warning(
+ f"Future covariate {cov_key} at index
{idx} has length {cov_value.shape[0]} (> {output_length}), which will be
truncated from the end."
+ )
+ future_covariates[cov_key] = cov_value[
+ :output_length
+ ]
+ else:
+ logger.warning(
+ f"Future covariate {cov_key} at index
{idx} has length {cov_value.shape[0]} (< {output_length}), which will be padded
with zeros at the end."
+ )
+ pad_size = output_length -
cov_value.shape[0]
+ future_covariates[cov_key] = F.pad(
+ cov_value, (0, pad_size)
+ )
+ else:
+ raise ValueError(
+ f"Individual `future_covariates` must be
1-d with length equal to `output_length` (= {output_length}), found: {cov_key}
with shape {tuple(cov_value.shape)} in element at index {idx}."
+ )
else:
raise ValueError(
f"The inputs must be a list of dictionaries, but got
{type(inputs)}."
diff --git a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
index 13c56ca9d2d..be1dd9bf1c2 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
@@ -69,9 +69,6 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset):
password: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_password(),
- time_zone: str = AINodeDescriptor()
- .get_config()
- .get_ain_cluster_ingress_time_zone(),
use_rate: float = 1.0,
offset_rate: float = 0.0,
):
@@ -90,7 +87,6 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset):
node_urls=[f"{ip}:{port}"],
user=username,
password=password,
- zone_id=time_zone,
use_ssl=AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_ssl_enabled(),
@@ -258,9 +254,6 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset):
password: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_password(),
- time_zone: str = AINodeDescriptor()
- .get_config()
- .get_ain_cluster_ingress_time_zone(),
use_rate: float = 1.0,
offset_rate: float = 0.0,
):
@@ -272,7 +265,6 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset):
node_urls=[f"{ip}:{port}"],
username=username,
password=password,
- time_zone=time_zone,
use_ssl=AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_ssl_enabled(),
diff --git a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
index 2ad25ad0529..07ca8a63bce 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
@@ -175,6 +175,66 @@ class InferenceManager:
with self._result_wrapper_lock:
del self._result_wrapper_map[req_id]
+ def _do_inference_and_construct_resp(
+ self,
+ model_id: str,
+ model_inputs_list: list[dict[str, torch.Tensor | dict[str,
torch.Tensor]]],
+ output_length: int,
+ inference_attrs: dict,
+ **kwargs,
+ ) -> list[bytes]:
+ auto_adapt = kwargs.get("auto_adapt", True)
+ if (
+ output_length
+ >
AINodeDescriptor().get_config().get_ain_inference_max_output_length()
+ ):
+ raise NumericalRangeException(
+ "output_length",
+ output_length,
+ 1,
+
AINodeDescriptor().get_config().get_ain_inference_max_output_length(),
+ )
+
+ if self._pool_controller.has_running_pools(model_id):
+ infer_req = InferenceRequest(
+ req_id=generate_req_id(),
+ model_id=model_id,
+ inputs=torch.stack(
+ [data["targets"] for data in model_inputs_list], dim=0
+ ),
+ output_length=output_length,
+ )
+ outputs = self._process_request(infer_req)
+ else:
+ model_info = self._model_manager.get_model_info(model_id)
+ inference_pipeline = load_pipeline(
+ model_info, device=self._backend.torch_device("cpu")
+ )
+ inputs = inference_pipeline.preprocess(
+ model_inputs_list,
+ output_length=output_length,
+ auto_adapt=auto_adapt,
+ )
+ if isinstance(inference_pipeline, ForecastPipeline):
+ outputs = inference_pipeline.forecast(
+ inputs, output_length=output_length, **inference_attrs
+ )
+ elif isinstance(inference_pipeline, ClassificationPipeline):
+ outputs = inference_pipeline.classify(inputs)
+ elif isinstance(inference_pipeline, ChatPipeline):
+ outputs = inference_pipeline.chat(inputs)
+ else:
+ outputs = None
+ logger.error("[Inference] Unsupported pipeline type.")
+ outputs = inference_pipeline.postprocess(outputs)
+
+ # convert tensor into tsblock for the output in each batch
+ resp_list = []
+ for batch_idx, output in enumerate(outputs):
+ resp = convert_tensor_to_tsblock(output)
+ resp_list.append(resp)
+ return resp_list
+
def _run(
self,
req,
@@ -191,65 +251,17 @@ class InferenceManager:
inference_attrs = extract_attrs(req)
output_length = int(inference_attrs.pop("output_length", 96))
- # model_inputs_list: Each element is a dict, which contains the
following keys:
- # `targets`: The input tensor for the target variable(s), whose
shape is [target_count, input_length].
model_inputs_list: list[
dict[str, torch.Tensor | dict[str, torch.Tensor]]
] = [{"targets": inputs[0]}]
- if (
- output_length
- >
AINodeDescriptor().get_config().get_ain_inference_max_output_length()
- ):
- raise NumericalRangeException(
- "output_length",
- output_length,
- 1,
- AINodeDescriptor()
- .get_config()
- .get_ain_inference_max_output_length(),
- )
-
- if self._pool_controller.has_running_pools(model_id):
- infer_req = InferenceRequest(
- req_id=generate_req_id(),
- model_id=model_id,
- inputs=torch.stack(
- [data["targets"] for data in model_inputs_list], dim=0
- ),
- output_length=output_length,
- )
- outputs = self._process_request(infer_req)
- else:
- model_info = self._model_manager.get_model_info(model_id)
- inference_pipeline = load_pipeline(
- model_info, device=self._backend.torch_device("cpu")
- )
- inputs = inference_pipeline.preprocess(
- model_inputs_list, output_length=output_length
- )
- if isinstance(inference_pipeline, ForecastPipeline):
- outputs = inference_pipeline.forecast(
- inputs, output_length=output_length, **inference_attrs
- )
- elif isinstance(inference_pipeline, ClassificationPipeline):
- outputs = inference_pipeline.classify(inputs)
- elif isinstance(inference_pipeline, ChatPipeline):
- outputs = inference_pipeline.chat(inputs)
- else:
- outputs = None
- logger.error("[Inference] Unsupported pipeline type.")
- outputs = inference_pipeline.postprocess(outputs)
-
- # convert tensor into tsblock for the output in each batch
- output_list = []
- for batch_idx, output in enumerate(outputs):
- output = convert_tensor_to_tsblock(output)
- output_list.append(output)
+ resp_list = self._do_inference_and_construct_resp(
+ model_id, model_inputs_list, output_length, inference_attrs
+ )
return resp_cls(
get_status(TSStatusCode.SUCCESS_STATUS),
- [output_list[0]] if single_batch else output_list,
+ [resp_list[0]] if single_batch else resp_list,
)
except Exception as e:
diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
index fc569b27807..5b653d678a6 100644
--- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
+++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
@@ -52,10 +52,6 @@ ain_cluster_ingress_username=root
# Datatype: String
ain_cluster_ingress_password=root
-# The time zone of the IoTDB cluster.
-# Datatype: String
-ain_cluster_ingress_time_zone=UTC+8
-
# The device space allocated for inference
# Datatype: Float
ain_inference_memory_usage_ratio=0.2
diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
index 68347b89203..0416f3c69cb 100644
--- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
+++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
@@ -87,7 +87,8 @@ struct TForecastReq {
3: required i32 outputLength
4: optional string historyCovs
5: optional string futureCovs
- 6: optional map<string, string> options
+ 6: optional bool autoAdapt
+ 7: optional map<string, string> options
}
struct TForecastResp {