TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619850306



##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` 
instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       The matrix class isn't changed at all, I just need it to inherit from 
the temporary OperationNode class, which is called OperationNode2

##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       The OperationNode2 is a temporary class which will be deleted after 
resolving this issue. You suggested that I create a clone of OperationNode and 
make my changes in this class. All changes should be applied directly in 
OperationNode after you give me the ok.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       My code makes OperationNode universal enough to handle all cases (multi 
and normal), there is no class "MultiNode" (for example) needed

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],

Review comment:
       If you let me implement a universal OperationNode class I won't be able 
to rely on a second node class, since there is only one

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not 
name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy 
arrays
+        :param number_of_outputs: If set to other value than 1 then it is 
expected
+            that this operation node returns multiple values. If set remember 
to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, 
operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = 
self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = 
self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                
result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                
result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with 
binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two 
input variables'
+            return 
f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, 
named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, 
prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation 
in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == 
OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], 
shape=self.shape)

Review comment:
       will do, thanks

##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       I can create this structure if you want, but in my opinion a single 
universal class that can handle any cases has less complexity and is thus 
easier to maintain. The variable _dml_name is needed so I can perform DFS on 
the DAG without evaluating paths more than once in case more than one output of 
a node are used as input in the DAG

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> 
str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in 
`input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       :)

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> 
str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in 
`input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node 
in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, 
input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and 
len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       in your scripts you use [V2_0, V2_1] as a result of operations with 
multiple outputs. I am leveraging this to reduce complexity in the built 
scripts. The alternative would be to do this: V3 = V2_0; V4=V2_1, which 
increases the amount of variables in the scripts
   
   name in this context ([V2_0, V2_1]) is the name of the output node edge, 
which is _0 and _1 in this case 

##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", 
center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", 
center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", 
OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", 
OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', 
unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       will do, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to