Baunsgaard commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r617303102



##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       maybe call it MultiNode.

##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` 
instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       It should not be necessary to change matrix at all, i would like it to 
stay as a OperationNode
   since it is the 'lowest' primitive.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],

Review comment:
       Make it a list of tuples of Operation Nodes, and leverage the previous 
operation node to construct the individual returns.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> 
str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in 
`input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node 
in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, 
input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and 
len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       this here seems oddly designed,
   if multi return, append the name again?
   
   can you explain?

##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       this seems bad because it change the allocation to a List rather than 
single element, maybe we should have a multi dag node. 
   
   If we chose to have a multi dag node then make two classes one called: 
SingleDAGNode another MultiDAGNode that both inherit from DAGNode.
   
   opinion?
   
   

##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", 
center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", 
center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", 
OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", 
OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', 
unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       In the end the tests should reflect what a user should write, therefore 
in this case
   The goal become these three:
   
   
   ```python
   [res, model, _, _] = pca(...)
   res = abs(res)
   ```
   
   or
   
   ```python
   m= pca(...)
   res = abs(m[0])
   ```
   
   or
   
   
   ```python
   m= pca(...)
   res = abs(m["model"])
   ```
   
   Now this is challenging since it require some change in our generated 
algorithms.
   Maybe to keep the project simple, you could make a new test file and do the 
following:
   
   ```python
   X = self.sds.rand(rows=1, cols=2, seed = 3)
   Y = self.sds.rand(rows=2, cols=1, seed=3)
   // I want the default to be unnamed input nodes if the argument is a list,
   // or if it is a dictionary named input nodes.
   M = multinode(sds, 'list', [X,Y])
   print(M[1]).compute()
   ```
   this should result in a script if you execute with  looking something like:
   ```dml
   X = rand(rows = 1, cols = 2,  seed = 3)
   Y = rand(rows = 2, cols = 1, seed = 3)
   m = list(X,Y)
   print(toString(m[1]))
   ```
   
   If it is not clear please say so :)

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not 
name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy 
arrays
+        :param number_of_outputs: If set to other value than 1 then it is 
expected
+            that this operation node returns multiple values. If set remember 
to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, 
operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = 
self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = 
self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                
result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                
result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with 
binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two 
input variables'
+            return 
f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, 
named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, 
prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation 
in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == 
OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], 
shape=self.shape)

Review comment:
       most of these are invalid, since you cant + a value on a list of 
heterogeneous outputs.
   I would suggest going through them and validate if they can be done.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       It will never be needed that it is pythonlocal data, since it should 
contain OperationNodes.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> 
str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in 
`input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       nice, early termination !

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not 
name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy 
arrays
+        :param number_of_outputs: If set to other value than 1 then it is 
expected
+            that this operation node returns multiple values. If set remember 
to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, 
operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = 
self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = 
self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                
result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                
result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with 
binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two 
input variables'
+            return 
f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, 
named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, 
prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation 
in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == 
OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], 
shape=self.shape)

Review comment:
       Actually double checking all operations from 
pass_python_data_to_prepared_script are invalid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to