TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851575



##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, 
TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", 
OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not 
name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy 
arrays
+        :param number_of_outputs: If set to other value than 1 then it is 
expected
+            that this operation node returns multiple values. If set remember 
to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, 
operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = 
self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = 
self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                
result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                
result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with 
binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two 
input variables'
+            return 
f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, 
named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, 
prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation 
in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == 
OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], 
shape=self.shape)

Review comment:
       will do, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to