This is an automated email from the ASF dual-hosted git repository.
cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new d3ffc705a3 [SYSTEMDS-3887] Multimodal HP-tuning fix
d3ffc705a3 is described below
commit d3ffc705a32857fce65e6d2071d4000ce11f3c99
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Jan 28 19:31:43 2026 +0100
[SYSTEMDS-3887] Multimodal HP-tuning fix
This patch fixes an issue in the multimodal hyperparameter tuning. It
introduces the correct functionality to get the TransformedModalities with the
latest representations already applied on.
---
.../scuro/drsearch/hyperparameter_tuner.py | 283 ++++++++++++++-------
.../scuro/representations/window_aggregation.py | 12 +-
src/main/python/tests/scuro/data_generator.py | 1 +
src/main/python/tests/scuro/test_hp_tuner.py | 80 ++++--
4 files changed, 251 insertions(+), 125 deletions(-)
diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
index ed0eb5abde..0a129c8eb4 100644
--- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
+++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
@@ -19,27 +19,85 @@
#
# -------------------------------------------------------------
from typing import Dict, List, Tuple, Any, Optional
-from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
-from skopt.utils import use_named_args
-import json
+import numpy as np
import logging
from dataclasses import dataclass
import time
import copy
-
+from joblib import Parallel, delayed
+from skopt import Optimizer
+from systemds.scuro.drsearch.representation_dag import (
+ RepresentationDAGBuilder,
+)
from systemds.scuro.modality.modality import Modality
+from systemds.scuro.drsearch.task import PerformanceMeasure
+import pickle
+
+
+def get_params_for_node(node_id, params):
+ return {
+ k.split("-")[-1]: v for k, v in params.items() if k.startswith(node_id
+ "-")
+ }
@dataclass
class HyperparamResult:
-
representation_name: str
best_params: Dict[str, Any]
best_score: float
all_results: List[Tuple[Dict[str, Any], float]]
tuning_time: float
modality_id: int
+ task_name: str
+ dag: Any
+ mm_opt: bool = False
+
+
+class HyperparamResults:
+ def __init__(self, tasks, modalities):
+ self.tasks = tasks
+ self.modalities = modalities
+ self.results = {}
+ for task in tasks:
+ self.results[task.model.name] = {
+ modality.modality_id: [] for modality in modalities
+ }
+
+ def add_result(self, results):
+ # TODO: Check if order of best results matters (deterministic)
+ for result in results:
+ if result.mm_opt:
+ self.results[result.task_name]["mm_results"].append(result)
+ else:
+
self.results[result.task_name][result.modality_id].append(result)
+
+ def setup_mm(self, optimize_unimodal):
+ if not optimize_unimodal:
+ self.results = {}
+ for task in self.tasks:
+ self.results[task.model.name] = {"mm_results": []}
+
+ def get_k_best_results(self, modality, task, performance_metric_name):
+ results = self.results[task.model.name][modality.modality_id]
+ dags = []
+ for result in results:
+ dag_with_best_params = RepresentationDAGBuilder()
+ prev_node_id = None
+ for node in result.dag.nodes:
+ if node.operation is not None and node.parameters:
+ params = get_params_for_node(node.node_id,
result.best_params)
+ prev_node_id = dag_with_best_params.create_operation_node(
+ node.operation, [prev_node_id], params
+ )
+ else: # it is a leaf node
+ prev_node_id = dag_with_best_params.create_leaf_node(
+ node.modality_id
+ )
+
+ dags.append(dag_with_best_params.build(prev_node_id))
+ representations = [list(dag.execute([modality]).values())[-1] for dag
in dags]
+ return results, representations
class HyperparameterTuner:
@@ -57,16 +115,17 @@ class HyperparameterTuner:
debug: bool = False,
):
self.tasks = tasks
- self.optimization_results = optimization_results
+ self.unimodal_optimization_results = optimization_results
+ self.optimization_results = HyperparamResults(tasks, modalities)
self.n_jobs = n_jobs
self.scoring_metric = scoring_metric
self.maximize_metric = maximize_metric
self.save_results = save_results
- self.results = {}
self.k = k
self.modalities = modalities
self.representations = None
self.k_best_cache = None
+ self.k_best_cache_by_modality = None
self.k_best_representations = None
self.extract_k_best_modalities_per_task()
self.debug = debug
@@ -96,43 +155,45 @@ class HyperparameterTuner:
def extract_k_best_modalities_per_task(self):
self.k_best_representations = {}
self.k_best_cache = {}
+ self.k_best_cache_by_modality = {}
representations = {}
for task in self.tasks:
self.k_best_representations[task.model.name] = []
self.k_best_cache[task.model.name] = []
+ self.k_best_cache_by_modality[task.model.name] = {}
representations[task.model.name] = {}
for modality in self.modalities:
k_best_results, cached_data = (
- self.optimization_results.get_k_best_results(
+ self.unimodal_optimization_results.get_k_best_results(
modality, task, self.scoring_metric
)
)
representations[task.model.name][modality.modality_id] =
k_best_results
+ self.k_best_cache_by_modality[task.model.name][
+ modality.modality_id
+ ] = cached_data
self.k_best_representations[task.model.name].extend(k_best_results)
self.k_best_cache[task.model.name].extend(cached_data)
self.representations = representations
def tune_unimodal_representations(self, max_eval_per_rep: Optional[int] =
None):
- results = {}
for task in self.tasks:
- results[task.model.name] = []
- for representation in self.k_best_representations[task.model.name]:
- result = self.tune_dag_representation(
- representation.dag,
- representation.dag.root_node_id,
- task,
- max_eval_per_rep,
+ reps = self.k_best_representations[task.model.name]
+ self.optimization_results.add_result(
+ Parallel(n_jobs=self.n_jobs)(
+ delayed(self.tune_dag_representation)(
+ rep.dag, rep.dag.root_node_id, task, max_eval_per_rep
+ )
+ for rep in reps
)
- results[task.model.name].append(result)
-
- self.results = results
+ )
if self.save_results:
self.save_tuning_results()
- return results
-
- def tune_dag_representation(self, dag, root_node_id, task, max_evals=None):
+ def tune_dag_representation(
+ self, dag, root_node_id, task, max_evals=None, mm_opt=False
+ ):
hyperparams = {}
reps = []
modality_ids = []
@@ -149,7 +210,7 @@ class HyperparameterTuner:
visited.add(node_id)
if node.operation is not None:
if node.operation().parameters:
- hyperparams.update(node.operation().parameters)
+ hyperparams[node_id] = node.operation().parameters
reps.append(node.operation)
node_order.append(node_id)
if node.modality_id is not None:
@@ -161,99 +222,136 @@ class HyperparameterTuner:
return None
start_time = time.time()
- rep_name = "_".join([rep.__name__ for rep in reps])
+ rep_name = "-".join([rep.__name__ for rep in reps])
search_space = []
param_names = []
- for param_name, param_values in hyperparams.items():
- param_names.append(param_name)
- if isinstance(param_values, list):
- if all(isinstance(v, (int, float)) for v in param_values):
- if all(isinstance(v, int) for v in param_values):
+ for op_id, op_params in hyperparams.items():
+ for param_name, param_values in op_params.items():
+ param_names.append(op_id + "-" + param_name)
+ if isinstance(param_values, list):
+ search_space.append(
+ Categorical(param_values, name=op_id + "-" +
param_name)
+ )
+ elif isinstance(param_values, tuple) and len(param_values) ==
2:
+ if isinstance(param_values[0], int) and isinstance(
+ param_values[1], int
+ ):
search_space.append(
Integer(
- min(param_values), max(param_values),
name=param_name
+ param_values[0],
+ param_values[1],
+ name=op_id + "-" + param_name,
)
)
else:
search_space.append(
- Real(min(param_values), max(param_values),
name=param_name)
+ Real(
+ param_values[0],
+ param_values[1],
+ name=op_id + "-" + param_name,
+ )
)
else:
- search_space.append(Categorical(param_values,
name=param_name))
- elif isinstance(param_values, tuple) and len(param_values) == 2:
- if isinstance(param_values[0], int) and isinstance(
- param_values[1], int
- ):
search_space.append(
- Integer(param_values[0], param_values[1],
name=param_name)
+ Categorical([param_values], name=op_id + "-" +
param_name)
)
- else:
- search_space.append(
- Real(param_values[0], param_values[1], name=param_name)
- )
- else:
- search_space.append(Categorical([param_values],
name=param_name))
n_calls = max_evals if max_evals else 50
all_results = []
- @use_named_args(search_space)
- def objective(**params):
+ def evaluate_point(point):
+ params = dict(zip(param_names, point))
result = self.evaluate_dag_config(
- dag, params, node_order, modality_ids, task
+ dag,
+ params,
+ node_order,
+ modality_ids,
+ task,
+ modalities_override=(
+ self._get_cached_modalities_for_task(task, modality_ids)
+ if mm_opt
+ else None
+ ),
)
- all_results.append(result)
-
- score = result[1].average_scores[self.scoring_metric]
+ score = result[1]
+ if isinstance(score, PerformanceMeasure):
+ score = score.average_scores[self.scoring_metric]
if self.maximize_metric:
- return -score
+ objective_value = -score
else:
- return score
+ objective_value = score
+ return objective_value, result
- result = gp_minimize(
- objective,
- search_space,
- n_calls=n_calls,
- random_state=42,
- verbose=self.debug,
- n_initial_points=min(10, n_calls // 2),
+ opt = Optimizer(
+ search_space, random_state=42, n_initial_points=min(10, n_calls //
2)
)
- if self.maximize_metric:
- best_params, best_score = max(
- all_results, key=lambda x:
x[1].average_scores[self.scoring_metric]
+ n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1
+ for _ in range(0, n_calls, n_batch):
+ points = opt.ask(n_points=n_batch)
+ results = Parallel(n_jobs=self.n_jobs)(
+ delayed(evaluate_point)(p) for p in points
)
+ objective_values = [result[0] for result in results]
+ all_results.extend(result[1] for result in results)
+ opt.tell(points, objective_values)
+
+ def get_score(result):
+ score = result[1]
+ if isinstance(score, PerformanceMeasure):
+ return score.average_scores[self.scoring_metric]
+ return score
+
+ if self.maximize_metric:
+ best_params, best_score = max(all_results, key=get_score)
else:
- best_params, best_score = min(
- all_results, key=lambda x:
x[1].average_scores[self.scoring_metric]
- )
+ best_params, best_score = min(all_results, key=get_score)
tuning_time = time.time() - start_time
- return HyperparamResult(
+ best_result = HyperparamResult(
representation_name=rep_name,
best_params=best_params,
best_score=best_score,
all_results=all_results,
tuning_time=tuning_time,
modality_id=modality_ids[0] if modality_ids else None,
+ task_name=task.model.name,
+ dag=dag,
+ mm_opt=mm_opt,
)
- def evaluate_dag_config(self, dag, params, node_order, modality_ids, task):
+ return best_result
+
+ def _get_cached_modalities_for_task(self, task, modality_ids):
+ if not self.k_best_cache_by_modality:
+ return self.get_modalities_by_id(modality_ids)
+ unique_modality_ids = list(dict.fromkeys(modality_ids))
+ cached_modalities = []
+ for modality_id in unique_modality_ids:
+ cached_modalities.extend(
+
self.k_best_cache_by_modality[task.model.name].get(modality_id, [])
+ )
+ return cached_modalities
+
+ def evaluate_dag_config(
+ self, dag, params, node_order, modality_ids, task,
modalities_override=None
+ ):
try:
dag_copy = copy.deepcopy(dag)
for node_id in node_order:
node = dag_copy.get_node_by_id(node_id)
if node.operation is not None and node.parameters:
- node_params = {
- k: v for k, v in params.items() if k in node.parameters
- }
- node.parameters = node_params
+ node.parameters = get_params_for_node(node_id, params)
- modalities = self.get_modalities_by_id(modality_ids)
+ modalities = (
+ modalities_override
+ if modalities_override is not None
+ else self.get_modalities_by_id(modality_ids)
+ )
modified_modality = dag_copy.execute(modalities, task)
score = task.run(
modified_modality[list(modified_modality.keys())[-1]].data
@@ -262,7 +360,7 @@ class HyperparameterTuner:
return params, score
except Exception as e:
self.logger.error(f"Error evaluating DAG with params {params}:
{e}")
- return params, float("-inf") if self.maximize_metric else
float("inf")
+ return params, np.nan
def tune_multimodal_representations(
self,
@@ -271,14 +369,25 @@ class HyperparameterTuner:
optimize_unimodal: bool = True,
max_eval_per_rep: Optional[int] = None,
):
- results = {}
+ self.optimization_results.setup_mm(optimize_unimodal)
for task in self.tasks:
+
+ def _get_metric_value(result):
+ score = result.val_score
+ if isinstance(score, PerformanceMeasure):
+ score = score.average_scores
+ if isinstance(score, dict):
+ return score.get(
+ self.scoring_metric,
+ float("-inf") if self.maximize_metric else
float("inf"),
+ )
+ return score
+
best_results = sorted(
optimization_results[task.model.name],
- key=lambda x: x.val_score,
- reverse=True,
+ key=_get_metric_value,
+ reverse=self.maximize_metric,
)[:k]
- results[task.model.name] = []
best_optimization_results = best_results
for representation in best_optimization_results:
@@ -314,32 +423,18 @@ class HyperparameterTuner:
representation.dag.root_node_id,
task,
max_eval_per_rep,
+ mm_opt=True,
)
- results[task.model.name].append(result)
-
- self.results = results
-
+ self.optimization_results.add_result([result])
if self.save_results:
self.save_tuning_results()
- return results
-
def save_tuning_results(self, filepath: str = None):
if not filepath:
filepath = f"hyperparameter_results_{int(time.time())}.json"
- json_results = {}
- for task in self.results.keys():
- for result in self.results[task]:
- json_results[result.representation_name] = {
- "best_params": result.best_params,
- "best_score": result.best_score,
- "tuning_time": result.tuning_time,
- "num_evaluations": len(result.all_results),
- }
-
- with open(filepath, "w") as f:
- json.dump(json_results, f, indent=2)
+ with open(filepath, "wb") as f:
+ pickle.dump(self.optimization_results.results, f)
if self.debug:
self.logger.info(f"Results saved to {filepath}")
diff --git
a/src/main/python/systemds/scuro/representations/window_aggregation.py
b/src/main/python/systemds/scuro/representations/window_aggregation.py
index 4d4ec19c5b..039387eb01 100644
--- a/src/main/python/systemds/scuro/representations/window_aggregation.py
+++ b/src/main/python/systemds/scuro/representations/window_aggregation.py
@@ -59,7 +59,9 @@ class Window(Context):
self._aggregation_function = Aggregation(value)
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+ [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
class WindowAggregation(Window):
def __init__(self, aggregation_function="mean", window_size=10, pad=False):
super().__init__("WindowAggregation", aggregation_function)
@@ -167,7 +169,9 @@ class WindowAggregation(Window):
return np.array(result)
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+ [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
class StaticWindow(Window):
def __init__(self, aggregation_function="mean", num_windows=100):
super().__init__("StaticWindow", aggregation_function)
@@ -198,7 +202,9 @@ class StaticWindow(Window):
return np.array(windowed_data)
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+ [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
class DynamicWindow(Window):
def __init__(self, aggregation_function="mean", num_windows=100):
super().__init__("DynamicWindow", aggregation_function)
diff --git a/src/main/python/tests/scuro/data_generator.py
b/src/main/python/tests/scuro/data_generator.py
index ae78c50b8a..cfa77b1dd6 100644
--- a/src/main/python/tests/scuro/data_generator.py
+++ b/src/main/python/tests/scuro/data_generator.py
@@ -62,6 +62,7 @@ class TestDataLoader(BaseLoader):
class ModalityRandomDataGenerator:
def __init__(self):
+ np.random.seed(4)
self.modality_id = 0
self.modality_type = None
self.metadata = {}
diff --git a/src/main/python/tests/scuro/test_hp_tuner.py
b/src/main/python/tests/scuro/test_hp_tuner.py
index 73c498e236..de7c8f0217 100644
--- a/src/main/python/tests/scuro/test_hp_tuner.py
+++ b/src/main/python/tests/scuro/test_hp_tuner.py
@@ -78,31 +78,31 @@ class TestHPTuner(unittest.TestCase):
self.run_hp_for_modality([audio])
- # def test_multimodal_hp_tuning(self):
- # audio_data, audio_md =
ModalityRandomDataGenerator().create_audio_data(
- # self.num_instances, 3000
- # )
- # audio = UnimodalModality(
- # TestDataLoader(
- # self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
- # )
- # )
- #
- # text_data, text_md = ModalityRandomDataGenerator().create_text_data(
- # self.num_instances
- # )
- # text = UnimodalModality(
- # TestDataLoader(
- # self.indices, None, ModalityType.TEXT, text_data, str,
text_md
- # )
- # )
- #
- # self.run_hp_for_modality(
- # [audio, text], multimodal=True,
tune_unimodal_representations=True
- # )
- # self.run_hp_for_modality(
- # [audio, text], multimodal=True,
tune_unimodal_representations=False
- # )
+ def test_multimodal_hp_tuning(self):
+ audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data(
+ self.num_instances, 3000
+ )
+ audio = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
+ )
+ )
+
+ text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+ self.num_instances
+ )
+ text = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.TEXT, text_data, str, text_md
+ )
+ )
+
+ # self.run_hp_for_modality(
+ # [audio, text], multimodal=True,
tune_unimodal_representations=True
+ # )
+ self.run_hp_for_modality(
+ [audio, text], multimodal=True, tune_unimodal_representations=False
+ )
def test_hp_tuner_for_text_modality(self):
text_data, text_md = ModalityRandomDataGenerator().create_text_data(
@@ -130,7 +130,7 @@ class TestHPTuner(unittest.TestCase):
},
):
registry = Registry()
- registry._fusion_operators = [Average, Concatenation, LSTM]
+ registry._fusion_operators = [LSTM]
unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks,
False)
unimodal_optimizer.optimize()
@@ -159,8 +159,32 @@ class TestHPTuner(unittest.TestCase):
else:
hp.tune_unimodal_representations(max_eval_per_rep=10)
- assert len(hp.results) == len(self.tasks)
- assert len(hp.results[self.tasks[0].model.name]) == 2
+ assert len(hp.optimization_results.results) == len(self.tasks)
+ if multimodal:
+ if tune_unimodal_representations:
+ assert (
+ len(
+
hp.optimization_results.results[self.tasks[0].model.name][0]
+ )
+ == 1
+ )
+ else:
+ assert (
+ len(
+
hp.optimization_results.results[self.tasks[0].model.name][
+ "mm_results"
+ ]
+ )
+ == 1
+ )
+ else:
+ assert (
+
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
+ )
+ assert (
+
len(hp.optimization_results.results[self.tasks[0].model.name][0])
+ == 2
+ )
if __name__ == "__main__":