This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 4434eeb [SYSTEMDS-3202] Python slice selection of rows and cols 4434eeb is described below commit 4434eeba465fecc6295d63a3f33ef47cfaa4f642 Author: baunsgaard <baunsga...@tugraz.at> AuthorDate: Fri Nov 5 15:09:42 2021 +0100 [SYSTEMDS-3202] Python slice selection of rows and cols This commit adds slice arguments to python to allow slicing a selection of rows or columns for both matrix and frame: X = sds.from_numpy(a) getCols 1 and 2: b = X[:,[1,2]] getRows 3,5 b = X[[3,5]] This fits to the python numpy specification. Closes #1438 --- src/main/python/systemds/operator/nodes/frame.py | 37 +++++++-- src/main/python/systemds/operator/nodes/matrix.py | 43 ++++++++--- src/main/python/systemds/script_building/script.py | 4 +- src/main/python/systemds/utils/helpers.py | 18 +++-- src/main/python/tests/frame/test_slice.py | 87 ++++++++++++++++++++++ src/main/python/tests/matrix/test_slice.py | 81 ++++++++++++++++++++ src/main/python/tests/matrix/test_split.py | 2 +- 7 files changed, 250 insertions(+), 22 deletions(-) diff --git a/src/main/python/systemds/operator/nodes/frame.py b/src/main/python/systemds/operator/nodes/frame.py index efa75b0..a609929 100644 --- a/src/main/python/systemds/operator/nodes/frame.py +++ b/src/main/python/systemds/operator/nodes/frame.py @@ -28,12 +28,12 @@ from typing import (TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, import numpy as np import pandas as pd from py4j.java_gateway import JavaObject, JVMView -from systemds.operator import Matrix, MultiReturn, OperationNode +from systemds.operator import Matrix, MultiReturn, OperationNode, Scalar from systemds.script_building.dag import DAGNode, OutputType from systemds.utils.consts import VALID_INPUT_TYPES from systemds.utils.converters import (frame_block_to_pandas, pandas_to_frame_block) -from systemds.utils.helpers import get_slice_string +from systemds.utils.helpers import check_is_empty_slice, check_no_less_than_zero, get_slice_string if TYPE_CHECKING: # to avoid cyclic dependencies during runtime @@ -73,7 +73,7 @@ class Frame(OperationNode): code_line = code_line.format(file_name=var_name) return code_line - def compute(self, verbose: bool = False, lineage: bool = False) -> Union[pd.DataFrame]: + def compute(self, verbose: bool = False, lineage: bool = False) -> pd.DataFrame: if self._is_pandas(): if verbose: print("[Pandas Frame - No Compilation necessary]") @@ -139,6 +139,33 @@ class Frame(OperationNode): def __str__(self): return "FrameNode" + def nRow(self) -> 'Scalar': + return Scalar(self.sds_context, 'nrow', [self]) + + def nCol(self) -> 'Scalar': + return Scalar(self.sds_context, 'ncol', [self]) + def __getitem__(self, i) -> 'Frame': - sliceIns = get_slice_string(i) - return Frame(self.sds_context, '', [self, sliceIns], brackets=True) + if isinstance(i, tuple) and len(i) > 2: + raise ValueError("Maximum of two dimensions are allowed") + elif isinstance(i, list): + check_no_less_than_zero(i) + slice = self.sds_context.from_numpy(np.array(i)) + 1 + select = Matrix(self.sds_context, "table", + [slice, 1, self.nRow(), 1]) + ret = Frame(self.sds_context, "removeEmpty", [], { + 'target': self, 'margin': '"rows"', 'select': select}) + return ret + elif isinstance(i, tuple) and isinstance(i[0], list) and isinstance(i[1], list): + raise NotImplementedError("double slicing is not supported yet") + elif isinstance(i, tuple) and check_is_empty_slice(i[0]) and isinstance(i[1], list): + check_no_less_than_zero(i[1]) + slice = self.sds_context.from_numpy(np.array(i[1])) + 1 + select = Matrix(self.sds_context, "table", + [slice, 1, self.nCol(), 1]) + ret = Frame(self.sds_context, "removeEmpty", [], { + 'target': self, 'margin': '"cols"', 'select': select}) + return ret + else: + sliceIns = get_slice_string(i) + return Frame(self.sds_context, '', [self, sliceIns], brackets=True) diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py index 9def246..6816245 100644 --- a/src/main/python/systemds/operator/nodes/matrix.py +++ b/src/main/python/systemds/operator/nodes/matrix.py @@ -21,25 +21,23 @@ __all__ = ["Matrix"] -import os -from typing import (TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, - Union) +from typing import TYPE_CHECKING, Dict, Iterable, Sequence, Union import numpy as np -from py4j.java_gateway import JavaObject, JVMView +from py4j.java_gateway import JavaObject from systemds.operator import OperationNode, Scalar from systemds.script_building.dag import OutputType from systemds.utils.consts import (BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES, VALID_INPUT_TYPES) from systemds.utils.converters import (matrix_block_to_numpy, numpy_to_matrix_block) -from systemds.utils.helpers import get_slice_string +from systemds.utils.helpers import check_is_empty_slice, check_no_less_than_zero, get_slice_string class Matrix(OperationNode): _np_array: np.array - def __init__(self, sds_context: 'SystemDSContext', operation: str, + def __init__(self, sds_context, operation: str, unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, @@ -68,7 +66,7 @@ class Matrix(OperationNode): code_line = code_line.format(file_name=var_name) return code_line - def compute(self, verbose: bool = False, lineage: bool = False) -> Union[np.array]: + def compute(self, verbose: bool = False, lineage: bool = False) -> np.array: if self._is_numpy(): if verbose: print('[Numpy Array - No Compilation necessary]') @@ -154,9 +152,36 @@ class Matrix(OperationNode): def __matmul__(self, other: 'Matrix') -> 'Matrix': return Matrix(self.sds_context, '%*%', [self, other]) + def nRow(self) -> 'Scalar': + return Scalar(self.sds_context, 'nrow', [self]) + + def nCol(self) -> 'Scalar': + return Scalar(self.sds_context, 'ncol', [self]) + def __getitem__(self, i): - sliceIns = get_slice_string(i) - return Matrix(self.sds_context, '', [self, sliceIns], brackets=True) + if isinstance(i, tuple) and len(i) > 2: + raise ValueError("Maximum of two dimensions are allowed") + elif isinstance(i, list): + check_no_less_than_zero(i) + slice = self.sds_context.from_numpy(np.array(i)) + 1 + select = Matrix(self.sds_context, "table", + [slice, 1, self.nRow(), 1]) + ret = Matrix(self.sds_context, "removeEmpty", [], { + 'target': self, 'margin': '"rows"', 'select': select}) + return ret + elif isinstance(i, tuple) and isinstance(i[0], list) and isinstance(i[1], list): + raise NotImplementedError("double slicing is not supported yet") + elif isinstance(i, tuple) and check_is_empty_slice(i[0]) and isinstance(i[1], list): + check_no_less_than_zero(i[1]) + slice = self.sds_context.from_numpy(np.array(i[1])) + 1 + select = Matrix(self.sds_context, "table", + [slice, 1, self.nCol(), 1]) + ret = Matrix(self.sds_context, "removeEmpty", [], { + 'target': self, 'margin': '"cols"', 'select': select}) + return ret + else: + sliceIns = get_slice_string(i) + return Matrix(self.sds_context, '', [self, sliceIns], brackets=True) def sum(self, axis: int = None) -> 'OperationNode': """Calculate sum of matrix. diff --git a/src/main/python/systemds/script_building/script.py b/src/main/python/systemds/script_building/script.py index 313c6b9..06753ab 100644 --- a/src/main/python/systemds/script_building/script.py +++ b/src/main/python/systemds/script_building/script.py @@ -81,7 +81,7 @@ class DMLScript: ret = self.prepared_script.executeScript() return ret except Py4JNetworkError: - exception_str = "Py4JNetworkError: no connection to JVM, most likely due to previous crash" + exception_str = "Py4JNetworkError: no connection to JVM, most likely due to previous crash or closed JVM from calls to close()" trace_back_limit = 0 except Exception as e: exception_str = str(e) @@ -111,7 +111,7 @@ class DMLScript: return ret, traces except Py4JNetworkError: - exception_str = "Py4JNetworkError: no connection to JVM, most likely due to previous crash" + exception_str = "Py4JNetworkError: no connection to JVM, most likely due to previous crash or closed JVM from calls to close()" trace_back_limit = 0 except Exception as e: exception_str = str(e) diff --git a/src/main/python/systemds/utils/helpers.py b/src/main/python/systemds/utils/helpers.py index 83ca596..b25ac65 100644 --- a/src/main/python/systemds/utils/helpers.py +++ b/src/main/python/systemds/utils/helpers.py @@ -51,12 +51,10 @@ def get_module_dir() -> os.PathLike: def get_slice_string(i): + if isinstance(i, list): + raise ValueError("Not Supported list query") if isinstance(i, tuple): - if len(i) > 2: - raise ValueError( - f'Invalid number of dimensions to slice {len(i)}, Only 2 dimensions allowed') - else: - return f'{get_slice_string(i[0])},{get_slice_string(i[1])}' + return f'{get_slice_string(i[0])},{get_slice_string(i[1])}' elif isinstance(i, slice): if i.step: raise ValueError("Invalid to slice with step in systemds") @@ -71,3 +69,13 @@ def get_slice_string(i): # + 1 since R and systemDS is 1 indexed. sliceIns = i+1 return sliceIns + + +def check_is_empty_slice(i): + return isinstance(i, slice) and i.start == None and i.stop == None and i.step == None + + +def check_no_less_than_zero(i: list): + for x in i: + if(x < 0): + raise ValueError("Negative index not supported in systemds") diff --git a/src/main/python/tests/frame/test_slice.py b/src/main/python/tests/frame/test_slice.py new file mode 100644 index 0000000..0bb5a31 --- /dev/null +++ b/src/main/python/tests/frame/test_slice.py @@ -0,0 +1,87 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import numpy as np +import pandas as pd +from systemds.context import SystemDSContext + +df = pd.DataFrame( + { + "col1": ["col1_hello_3", "col1_world_3", "col1_hello_3"], + "col2": [6, 7, 8], + "col3": [0.6, 0.7, 0.8], + } +) + + +class TestFederatedAggFn(unittest.TestCase): + + sds: SystemDSContext = None + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_setup(self): + sm = self.sds.from_pandas(df) + sr = sm.compute() + self.assertTrue(isinstance(sr, pd.DataFrame)) + e = df + self.assertTrue((e.values == sr.values).all()) + + def test_slice_first_third_row(self): + sm = self.sds.from_pandas(df)[[0, 2]] + sr = sm.compute() + e = df.loc[[0, 2]] + self.assertTrue((e.values == sr.values).all()) + + def test_slice_single_row(self): + sm = self.sds.from_pandas(df)[[1]] + sr = sm.compute() + e = df.loc[[1]] + self.assertTrue((e.values == sr.values).all()) + + def test_slice_last_row(self): + with self.assertRaises(ValueError): + self.sds.from_pandas(df)[[-1]] + + # https://issues.apache.org/jira/browse/SYSTEMDS-3203 + # def test_slice_first_third_col(self): + # sm = self.sds.from_pandas(df)[:, [0, 2]] + # sr = sm.compute() + # e = df.loc[:, [0, 2]] + # self.assertTrue((e.values == sr.values).all()) + + # def test_slice_single_col(self): + # sm = self.sds.from_pandas(df)[:, [1]] + # sr = sm.compute() + # e = df.loc[:, [1]] + # self.assertTrue((e.values == sr.values).all()) + + def test_slice_row_col_both(self): + with self.assertRaises(NotImplementedError): + self.sds.from_pandas(df)[[1, 2], [0, 2]] diff --git a/src/main/python/tests/matrix/test_slice.py b/src/main/python/tests/matrix/test_slice.py new file mode 100644 index 0000000..bb988a6 --- /dev/null +++ b/src/main/python/tests/matrix/test_slice.py @@ -0,0 +1,81 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import numpy as np +from systemds.context import SystemDSContext + +# Seed the randomness. +np.random.seed(7) + +m = np.random.rand(3, 4) + + +class TestFederatedAggFn(unittest.TestCase): + + sds: SystemDSContext = None + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_setup(self): + sm = self.sds.from_numpy(m) + sr = sm.compute() + e = m + self.assertTrue(np.allclose(e, sr)) + + def test_slice_first_third_row(self): + sm = self.sds.from_numpy(m)[[0, 2]] + sr = sm.compute() + e = m[[0, 2]] + self.assertTrue(np.allclose(e, sr)) + + def test_slice_single_row(self): + sm = self.sds.from_numpy(m)[[1]] + sr = sm.compute() + e = m[[1]] + self.assertTrue(np.allclose(e, sr)) + + def test_slice_last_row(self): + with self.assertRaises(ValueError): + self.sds.from_numpy(m)[[-1]] + + def test_slice_first_third_col(self): + sm = self.sds.from_numpy(m)[:, [0, 2]] + sr = sm.compute() + e = m[:, [0, 2]] + self.assertTrue(np.allclose(e, sr)) + + def test_slice_single_col(self): + sm = self.sds.from_numpy(m)[:, [1]] + sr = sm.compute() + e = m[:, [1]] + self.assertTrue(np.allclose(e, sr)) + + def test_slice_row_col_both(self): + with self.assertRaises(NotImplementedError): + self.sds.from_numpy(m)[[1, 2], [0, 3]] diff --git a/src/main/python/tests/matrix/test_split.py b/src/main/python/tests/matrix/test_split.py index 7c92353..2920821 100644 --- a/src/main/python/tests/matrix/test_split.py +++ b/src/main/python/tests/matrix/test_split.py @@ -26,7 +26,7 @@ import numpy as np from systemds.context import SystemDSContext from systemds.operator.algorithm import split -# Seed the random ness. +# Seed the randomness. np.random.seed(7) class TestOrder(unittest.TestCase):