This is an automated email from the ASF dual-hosted git repository. cdionysio pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 5dfa26f235 [SYSTEMDS-3835] Add additional context operators 5dfa26f235 is described below commit 5dfa26f23583f399187e5b3d785bf653a25ee8d3 Author: Christina Dionysio <diony...@tu-berlin.de> AuthorDate: Wed Sep 3 10:00:13 2025 +0200 [SYSTEMDS-3835] Add additional context operators This patch adds two additional context operators to Scuro. The first one is a StaticWindow operator that, given a number of desired windows, defines the suitable window size and aggregates a sequence into num_window features. The second context operator is a DynamicWindow where a sequence is also aggregated into num_window features with the difference that the window size for more recent data points is smaller than the window size for more historic data points in a timeseries. --- src/main/python/systemds/scuro/__init__.py | 8 +- .../systemds/scuro/drsearch/unimodal_optimizer.py | 4 +- .../systemds/scuro/representations/fusion.py | 2 + .../scuro/representations/window_aggregation.py | 107 +++++++++++++++++---- .../python/tests/scuro/test_operator_registry.py | 12 ++- .../python/tests/scuro/test_unimodal_optimizer.py | 2 +- .../python/tests/scuro/test_window_operations.py | 53 ++++++++-- 7 files changed, 159 insertions(+), 29 deletions(-) diff --git a/src/main/python/systemds/scuro/__init__.py b/src/main/python/systemds/scuro/__init__.py index b2a5e9df37..8e83c865a2 100644 --- a/src/main/python/systemds/scuro/__init__.py +++ b/src/main/python/systemds/scuro/__init__.py @@ -55,7 +55,11 @@ from systemds.scuro.representations.swin_video_transformer import SwinVideoTrans from systemds.scuro.representations.tfidf import TfIdf from systemds.scuro.representations.unimodal import UnimodalRepresentation from systemds.scuro.representations.wav2vec import Wav2Vec -from systemds.scuro.representations.window_aggregation import WindowAggregation +from systemds.scuro.representations.window_aggregation import ( + WindowAggregation, + DynamicWindow, + StaticWindow, +) from systemds.scuro.representations.word2vec import W2V from systemds.scuro.representations.x3d import X3D from systemds.scuro.models.model import Model @@ -145,4 +149,6 @@ __all__ = [ "RMSE", "Spectral", "AttentionFusion", + "DynamicWindow", + "StaticWindow", ] diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index b84d86d94d..86c7ce1e63 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -122,8 +122,8 @@ class UnimodalOptimizer: for context_operator_after in context_operators: con_op_after = context_operator_after() - mod = mod.context(con_op_after) - self._evaluate_local(mod, [mod_op, con_op_after], local_results) + mod_con = mod.context(con_op_after) + self._evaluate_local(mod_con, [mod_op, con_op_after], local_results) return local_results diff --git a/src/main/python/systemds/scuro/representations/fusion.py b/src/main/python/systemds/scuro/representations/fusion.py index 61988abba2..ea614ac095 100644 --- a/src/main/python/systemds/scuro/representations/fusion.py +++ b/src/main/python/systemds/scuro/representations/fusion.py @@ -105,6 +105,8 @@ class Fusion(Representation): curr_shape = modalities[idx].data[0].shape if len(modalities[idx - 1].data) != len(modalities[idx].data): raise f"Modality sizes don't match!" + elif len(curr_shape) == 1: + continue elif curr_shape[1] > max_size: max_size = curr_shape[1] diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index 167f4adafe..b3ad9e1b93 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -18,6 +18,8 @@ # under the License. # # ------------------------------------------------------------- +import copy + import numpy as np import math @@ -28,17 +30,13 @@ from systemds.scuro.representations.aggregate import Aggregation from systemds.scuro.representations.context import Context -@register_context_operator() -class WindowAggregation(Context): - def __init__(self, window_size=10, aggregation_function="mean", pad=True): +class Window(Context): + def __init__(self, name, aggregation_function): parameters = { - "window_size": [window_size], "aggregation_function": list(Aggregation().get_aggregation_functions()), - } # TODO: window_size should be dynamic and adapted to the shape of the data - super().__init__("WindowAggregation", parameters) - self.window_size = window_size + } + super().__init__(name, parameters) self.aggregation_function = aggregation_function - self.pad = pad @property def aggregation_function(self): @@ -48,6 +46,15 @@ class WindowAggregation(Context): def aggregation_function(self, value): self._aggregation_function = Aggregation(value) + +@register_context_operator() +class WindowAggregation(Window): + def __init__(self, window_size=10, aggregation_function="mean", pad=False): + super().__init__("WindowAggregation", aggregation_function) + self.parameters["window_size"] = [window_size] + self.window_size = window_size + self.pad = pad + def execute(self, modality): windowed_data = [] original_lengths = [] @@ -107,24 +114,90 @@ class WindowAggregation(Context): def window_aggregate_single_level(self, instance, new_length): if isinstance(instance, str): return instance - instance = np.array(instance) - num_cols = instance.shape[1] if instance.ndim > 1 else 1 - result = np.empty((new_length, num_cols)) + instance = np.array(copy.deepcopy(instance)) + + result = [] for i in range(0, new_length): - result[i] = self.aggregation_function.aggregate_instance( - instance[i * self.window_size : i * self.window_size + self.window_size] + result.append( + self.aggregation_function.aggregate_instance( + instance[ + i * self.window_size : i * self.window_size + self.window_size + ] + ) ) - if num_cols == 1: - result = result.reshape(-1) - return result + return np.array(result) def window_aggregate_nested_level(self, instance, new_length): result = [[] for _ in range(0, new_length)] - data = np.stack(instance) + data = np.stack(copy.deepcopy(instance)) for i in range(0, new_length): result[i] = self.aggregation_function.aggregate_instance( data[i * self.window_size : i * self.window_size + self.window_size] ) return np.array(result) + + +@register_context_operator() +class StaticWindow(Window): + def __init__(self, num_windows=100, aggregation_function="mean"): + super().__init__("StaticWindow", aggregation_function) + self.parameters["num_windows"] = [num_windows] + self.num_windows = num_windows + + def execute(self, modality): + windowed_data = [] + + for instance in modality.data: + window_size = len(instance) // self.num_windows + remainder = len(instance) % self.num_windows + output = [] + start = 0 + for i in range(0, self.num_windows): + extra = 1 if i < remainder else 0 + end = start + window_size + extra + window = copy.deepcopy(instance[start:end]) + val = ( + self.aggregation_function.aggregate_instance(window) + if len(window) > 0 + else np.zeros_like(output[i - 1]) + ) + output.append(val) + start = end + + windowed_data.append(output) + return np.array(windowed_data) + + +@register_context_operator() +class DynamicWindow(Window): + def __init__(self, num_windows=100, aggregation_function="mean"): + super().__init__("DynamicWindow", aggregation_function) + self.parameters["num_windows"] = [num_windows] + self.num_windows = num_windows + + def execute(self, modality): + windowed_data = [] + + for instance in modality.data: + N = len(instance) + weights = np.geomspace(4, 256, num=self.num_windows) + weights = weights / np.sum(weights) + window_sizes = (weights * N).astype(int) + window_sizes[-1] += N - np.sum(window_sizes) + indices = np.cumsum(window_sizes) + output = [] + start = 0 + for end in indices: + window = copy.deepcopy(instance[start:end]) + val = ( + self.aggregation_function.aggregate_instance(window) + if len(window) > 0 + else np.zeros_like(instance[0]) + ) + output.append(val) + start = end + windowed_data.append(output) + + return np.array(windowed_data) diff --git a/src/main/python/tests/scuro/test_operator_registry.py b/src/main/python/tests/scuro/test_operator_registry.py index a6941fe618..0d83d83bda 100644 --- a/src/main/python/tests/scuro/test_operator_registry.py +++ b/src/main/python/tests/scuro/test_operator_registry.py @@ -30,7 +30,11 @@ from systemds.scuro.representations.covarep_audio_features import ( from systemds.scuro.representations.mfcc import MFCC from systemds.scuro.representations.swin_video_transformer import SwinVideoTransformer from systemds.scuro.representations.wav2vec import Wav2Vec -from systemds.scuro.representations.window_aggregation import WindowAggregation +from systemds.scuro.representations.window_aggregation import ( + WindowAggregation, + StaticWindow, + DynamicWindow, +) from systemds.scuro.representations.bow import BoW from systemds.scuro.representations.word2vec import W2V from systemds.scuro.representations.tfidf import TfIdf @@ -83,7 +87,11 @@ class TestOperatorRegistry(unittest.TestCase): def test_context_operator_in_registry(self): registry = Registry() - assert registry.get_context_operators() == [WindowAggregation] + assert registry.get_context_operators() == [ + WindowAggregation, + StaticWindow, + DynamicWindow, + ] # def test_fusion_operator_in_registry(self): # registry = Registry() diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py b/src/main/python/tests/scuro/test_unimodal_optimizer.py index b5d2b266f6..a4952d29f9 100644 --- a/src/main/python/tests/scuro/test_unimodal_optimizer.py +++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py @@ -141,7 +141,7 @@ class TestUnimodalRepresentationOptimizer(unittest.TestCase): def test_unimodal_optimizer_for_audio_modality(self): audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( - self.num_instances, 100 + self.num_instances, 3000 ) audio = UnimodalModality( TestDataLoader( diff --git a/src/main/python/tests/scuro/test_window_operations.py b/src/main/python/tests/scuro/test_window_operations.py index ea1b0f46f2..9aab25a814 100644 --- a/src/main/python/tests/scuro/test_window_operations.py +++ b/src/main/python/tests/scuro/test_window_operations.py @@ -24,8 +24,13 @@ import math import numpy as np -from tests.scuro.data_generator import ModalityRandomDataGenerator +from tests.scuro.data_generator import ModalityRandomDataGenerator, TestDataLoader from systemds.scuro.modality.type import ModalityType +from systemds.scuro.modality.unimodal_modality import UnimodalModality +from systemds.scuro.representations.window_aggregation import ( + StaticWindow, + DynamicWindow, +) class TestWindowOperations(unittest.TestCase): @@ -35,20 +40,56 @@ class TestWindowOperations(unittest.TestCase): cls.data_generator = ModalityRandomDataGenerator() cls.aggregations = ["mean", "sum", "max", "min"] - def test_window_operations_on_audio_representations(self): + def test_static_window(self): + num_windows = 5 + data, md = self.data_generator.create_visual_modality(self.num_instances, 50) + modality = UnimodalModality( + TestDataLoader( + [i for i in range(0, self.num_instances)], + None, + ModalityType.VIDEO, + data, + np.float32, + md, + ) + ) + aggregated_window = modality.context(StaticWindow(num_windows)) + + for i in range(0, self.num_instances): + assert len(aggregated_window.data[i]) == num_windows + + def test_dynamic_window(self): + num_windows = 5 + data, md = self.data_generator.create_visual_modality(self.num_instances, 50) + modality = UnimodalModality( + TestDataLoader( + [i for i in range(0, self.num_instances)], + None, + ModalityType.VIDEO, + data, + np.float32, + md, + ) + ) + aggregated_window = modality.context(DynamicWindow(num_windows)) + + for i in range(0, self.num_instances): + assert len(aggregated_window.data[i]) == num_windows + + def test_window_aggregation_on_audio_representations(self): window_size = 10 - self.run_window_operations_for_modality(ModalityType.AUDIO, window_size) + self.run_window_aggregation_for_modality(ModalityType.AUDIO, window_size) def test_window_operations_on_video_representations(self): window_size = 10 - self.run_window_operations_for_modality(ModalityType.VIDEO, window_size) + self.run_window_aggregation_for_modality(ModalityType.VIDEO, window_size) def test_window_operations_on_text_representations(self): window_size = 10 - self.run_window_operations_for_modality(ModalityType.TEXT, window_size) + self.run_window_aggregation_for_modality(ModalityType.TEXT, window_size) - def run_window_operations_for_modality(self, modality_type, window_size): + def run_window_aggregation_for_modality(self, modality_type, window_size): r = self.data_generator.create1DModality(40, 100, modality_type) for aggregation in self.aggregations: windowed_modality = r.window_aggregation(window_size, aggregation)