This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 3d6190b2ac7ae82b47166736c3ba46c17677dcfa
Author: Bernhard Leder <[email protected]>
AuthorDate: Thu Apr 22 17:15:12 2021 +0200

    [SYSTEMDS-2882] Python Frame Support
    
    This commit adds frame support in our python API.
    
    - Add support for Pandas to Frame
    - Add support for reading Frames from python API
    - Add support for transform encode
    - Add support for transform decode
    
    Closes #1239
---
 .github/workflows/federatedPython.yml              |   2 +-
 .github/workflows/python.yml                       |   2 +-
 src/main/python/setup.py                           |   3 +-
 .../python/systemds/context/systemds_context.py    |  25 +++-
 src/main/python/systemds/frame/__init__.py         |  24 ++++
 src/main/python/systemds/frame/frame.py            | 128 ++++++++++++++++++++
 .../python/systemds/operator/algorithm/__init__.py |   3 -
 .../python/systemds/operator/operation_node.py     | 100 ++++++++++++----
 src/main/python/systemds/utils/converters.py       |  75 +++++++++++-
 src/main/python/tests/frame/data/homes.csv         |  21 ++++
 .../python/tests/frame/data/homes.tfspec_bin2.json |   6 +
 .../tests/frame/data/homes.tfspec_recode2.json     |   3 +
 src/main/python/tests/frame/test_hyperband.py      |  86 ++++++++++++++
 src/main/python/tests/frame/test_r_c_bind.py       | 130 +++++++++++++++++++++
 .../python/tests/frame/test_transform_apply.py     |  86 ++++++++++++++
 .../python/tests/frame/test_transform_encode.py    |  75 ++++++++++++
 src/main/python/tests/frame/test_write_read.py     |  74 ++++++++++++
 17 files changed, 811 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/federatedPython.yml 
b/.github/workflows/federatedPython.yml
index 2c4dcdb..ed192d0 100644
--- a/.github/workflows/federatedPython.yml
+++ b/.github/workflows/federatedPython.yml
@@ -74,7 +74,7 @@ jobs:
           ${{ runner.os }}-pip-${{ matrix.python-version }}-
   
     - name: Install pip Dependencies
-      run: pip install numpy py4j wheel
+      run: pip install numpy py4j wheel pandas
 
     - name: Build Python Package
       run: |
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 9947135..a34a757 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -85,7 +85,7 @@ jobs:
       run: sudo apt-get install protobuf-compiler libprotoc-dev 
   
     - name: Install pip Dependencies
-      run: pip install numpy py4j wheel scipy sklearn requests
+      run: pip install numpy py4j wheel scipy sklearn requests pandas
 
     - name: Build Python Package
       run: |
diff --git a/src/main/python/setup.py b/src/main/python/setup.py
index 5939101..f79d220 100755
--- a/src/main/python/setup.py
+++ b/src/main/python/setup.py
@@ -38,7 +38,8 @@ ARTIFACT_VERSION_SHORT = ARTIFACT_VERSION.split("-")[0]
 REQUIRED_PACKAGES = [
     'numpy >= 1.8.2',
     'py4j >= 0.10.9',
-    'requests >= 2.24.0'
+    'requests >= 2.24.0',
+    'pandas >= 1.2.2' 
 ]
 
 python_dir = 'systemds'
diff --git a/src/main/python/systemds/context/systemds_context.py 
b/src/main/python/systemds/context/systemds_context.py
index 107d875..f274a25 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -40,6 +40,8 @@ from systemds.script_building import OutputType
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.helpers import get_module_dir
 
+from systemds.frame import Frame
+
 
 class SystemDSContext(object):
     """A context with a connection to a java instance with which SystemDS 
operations are executed. 
@@ -299,7 +301,28 @@ class SystemDSContext(object):
         See: 
http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions
 for more details
         :return: an Operation Node, containing the read data.
         """
-        return OperationNode(self, 'read', [f'"{path}"'], 
named_input_nodes=kwargs, shape=(-1,))
+        data_type = kwargs.get("data_type", None)
+        file_format = kwargs.get("format", None)
+        if data_type == "frame":
+            kwargs["data_type"] = f'"{data_type}"'
+            if isinstance(file_format, str):
+                kwargs["format"] = f'"{kwargs["format"]}"'
+            return Frame(self, None, f'"{path}"', **kwargs)
+        elif data_type == "scalar":
+            kwargs["data_type"] = f'"{data_type}"'
+            value_type = kwargs.get("value_type", None)
+            if value_type == "string":
+                kwargs["value_type"] = f'"{kwargs["value_type"]}"'
+                return OperationNode(
+                    self,
+                    "read",
+                    [f'"{path}"'],
+                    named_input_nodes=kwargs,
+                    shape=(-1,),
+                    output_type=OutputType.SCALAR,
+                )
+        return OperationNode(self, "read", [f'"{path}"'], 
+                             named_input_nodes=kwargs, shape=(-1,))
 
     def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode':
         """ Construct an scalar value, this can contain str, float, double, 
integers and booleans.
diff --git a/src/main/python/systemds/frame/__init__.py 
b/src/main/python/systemds/frame/__init__.py
new file mode 100644
index 0000000..47b9b6c
--- /dev/null
+++ b/src/main/python/systemds/frame/__init__.py
@@ -0,0 +1,24 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from systemds.frame.frame import Frame
+
+__all__ = [Frame]
diff --git a/src/main/python/systemds/frame/frame.py 
b/src/main/python/systemds/frame/frame.py
new file mode 100644
index 0000000..5cc4554
--- /dev/null
+++ b/src/main/python/systemds/frame/frame.py
@@ -0,0 +1,128 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import os
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+import numpy as np
+import pandas as pd
+from py4j.java_gateway import JavaObject, JVMView
+from systemds.operator import OperationNode
+from systemds.utils.consts import VALID_INPUT_TYPES
+from systemds.utils.converters import pandas_to_frame_block
+from systemds.script_building.dag import OutputType, DAGNode
+
+
+class Frame(OperationNode):
+    def __init__(
+        self,
+        sds_context: "SystemDSContext",
+        df: pd.DataFrame,
+        *args: Sequence[VALID_INPUT_TYPES],
+        **kwargs: Dict[str, VALID_INPUT_TYPES]
+    ) -> None:
+
+        if args:
+            unnamed_params = args
+        else:
+            unnamed_params = ["'./tmp/{file_name}'"]
+            unnamed_params.extend(args)
+
+        named_params = {"data_type": '"frame"'}
+
+        self._pd_dataframe = df
+        self.shape = None
+        if isinstance(self._pd_dataframe, pd.DataFrame):
+            self.shape = self._pd_dataframe.shape
+            named_params["rows"] = self.shape[0]
+            named_params["cols"] = self.shape[1]
+
+        named_params.update(kwargs)
+        super().__init__(
+            sds_context,
+            "read",
+            unnamed_params,
+            named_params,
+            output_type=OutputType.FRAME,
+            is_python_local_data=self._is_pandas(),
+            shape=self.shape,
+        )
+
+    def pass_python_data_to_prepared_script(
+        self, sds, var_name: str, prepared_script: JavaObject
+    ) -> None:
+        assert (
+            self.is_python_local_data
+        ), "Can only pass data to prepared script if it is python local!"
+        if self._is_pandas():
+            prepared_script.setFrame(
+                var_name, pandas_to_frame_block(sds, self._pd_dataframe), True
+            )  # True for reuse
+
+    def code_line(
+        self,
+        var_name: str,
+        unnamed_input_vars: Sequence[str],
+        named_input_vars: Dict[str, str],
+    ) -> str:
+        code_line = super().code_line(var_name, unnamed_input_vars, 
named_input_vars)
+        if self._is_pandas():
+            code_line = code_line.format(file_name=var_name)
+        return code_line
+
+    def compute(
+        self, verbose: bool = False, lineage: bool = False
+    ) -> Union[pd.DataFrame]:
+        if self._is_pandas():
+            if verbose:
+                print("[Pandas Frame - No Compilation necessary]")
+            return self._pd_dataframe
+        else:
+            return super().compute(verbose, lineage)
+
+    def _is_pandas(self) -> bool:
+        return self._pd_dataframe is not None
+
+    def transform_encode(self, spec):
+        self._check_frame_op()
+        self._check_other(spec, OutputType.SCALAR)
+        params_dict = {"target": self, "spec": spec}
+        return OperationNode(
+            self.sds_context,
+            "transformencode",
+            named_input_nodes=params_dict,
+            output_type=OutputType.LIST,
+            number_of_outputs=2,
+            output_types=[OutputType.MATRIX, OutputType.FRAME],
+        )
+
+    def transform_apply(self, spec: "OperationNode", meta: "OperationNode"):
+        self._check_frame_op()
+        self._check_other(spec, OutputType.SCALAR)
+        self._check_other(meta, OutputType.FRAME)
+        params_dict = {"target": self, "spec": spec, "meta": meta}
+        return OperationNode(
+            self.sds_context,
+            "transformapply",
+            named_input_nodes=params_dict,
+            output_type=OutputType.MATRIX,
+            number_of_outputs=1,
+        )
diff --git a/src/main/python/systemds/operator/algorithm/__init__.py 
b/src/main/python/systemds/operator/algorithm/__init__.py
index b3bcd3d..25d4602 100644
--- a/src/main/python/systemds/operator/algorithm/__init__.py
+++ b/src/main/python/systemds/operator/algorithm/__init__.py
@@ -41,7 +41,6 @@ from .builtin.dbscan import dbscan
 from .builtin.decisionTree import decisionTree 
 from .builtin.discoverFD import discoverFD 
 from .builtin.dist import dist 
-from .builtin.gaussianClassifier import gaussianClassifier 
 from .builtin.getAccuracy import getAccuracy 
 from .builtin.glm import glm 
 from .builtin.gmm import gmm 
@@ -62,7 +61,6 @@ from .builtin.kmeans import kmeans
 from .builtin.kmeansPredict import kmeansPredict 
 from .builtin.knnbf import knnbf 
 from .builtin.l2svm import l2svm 
-from .builtin.l2svmPredict import l2svmPredict 
 from .builtin.lasso import lasso 
 from .builtin.lm import lm 
 from .builtin.lmCG import lmCG 
@@ -95,7 +93,6 @@ from .builtin.splitBalanced import splitBalanced
 from .builtin.statsNA import statsNA 
 from .builtin.steplm import steplm 
 from .builtin.toOneHot import toOneHot 
-from .builtin.tomeklink import tomeklink 
 from .builtin.univar import univar 
 from .builtin.vectorToCsv import vectorToCsv 
 from .builtin.winsorize import winsorize 
diff --git a/src/main/python/systemds/operator/operation_node.py 
b/src/main/python/systemds/operator/operation_node.py
index e2ac681..62f9f53 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -27,7 +27,7 @@ from py4j.java_gateway import JVMView, JavaObject
 
 from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, 
VALID_ARITHMETIC_TYPES
 from systemds.utils.helpers import create_params_string
-from systemds.utils.converters import matrix_block_to_numpy
+from systemds.utils.converters import matrix_block_to_numpy, 
frame_block_to_pandas
 from systemds.script_building.script import DMLScript
 from systemds.script_building.dag import OutputType, DAGNode
 
@@ -39,6 +39,7 @@ if TYPE_CHECKING:
 
 class OperationNode(DAGNode):
     """A Node representing an operation in SystemDS"""
+
     shape: Optional[Tuple[int]]
     _result_var: Optional[Union[float, np.array]]
     _lineage_trace: Optional[str]
@@ -99,7 +100,7 @@ class OperationNode(DAGNode):
             else:
                 result_variables = self._script.execute()
 
-            self._result_var =  
self.__parse_output_result_variables(result_variables)
+            self._result_var = 
self.__parse_output_result_variables(result_variables)
 
         if verbose:
             for x in self.sds_context.get_stdout():
@@ -119,6 +120,10 @@ class OperationNode(DAGNode):
             return self.__parse_output_result_matrix(result_variables, 
self._script.out_var_name[0])
         elif self.output_type == OutputType.LIST:
             return self.__parse_output_result_list(result_variables)
+        elif self.output_type == OutputType.FRAME:
+            return self.__parse_output_result_frame(
+                result_variables, self._script.out_var_name[0]
+            )
 
     def __parse_output_result_double(self, result_variables, var_name):
         return result_variables.getDouble(var_name)
@@ -127,11 +132,19 @@ class OperationNode(DAGNode):
         return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
                                          
result_variables.getMatrixBlock(var_name))
 
+    def __parse_output_result_frame(self, result_variables, var_name):
+        return frame_block_to_pandas(
+            self.sds_context, result_variables.getFrameBlock(var_name)
+        )
+
     def __parse_output_result_list(self, result_variables):
         result_var = []
         for idx, v in enumerate(self._script.out_var_name):
             if(self._output_types == None or self._output_types[idx] == 
OutputType.MATRIX):
                 
result_var.append(self.__parse_output_result_matrix(result_variables,v))
+            elif self._output_types[idx] == OutputType.FRAME:
+                
result_var.append(self.__parse_output_result_frame(result_variables, v))
+
             else:
                 result_var.append(result_variables.getDouble(
                     self._script.out_var_name[idx]))
@@ -184,7 +197,43 @@ class OperationNode(DAGNode):
         :raise: AssertionError
         """
         assert self.output_type == OutputType.MATRIX, f'{self.operation} only 
supported for matrices'
+    
+    def _check_frame_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode`
+
+        :raise: AssertionError
+        """
+        assert (
+            self.output_type == OutputType.FRAME
+        ), f"{self.operation} only supported for frames"
+
+    def _check_matrix_or_frame_op(self):
+        """Perform checks to assure operation is allowed to be performed on 
data type of this `OperationNode`
+
+        :raise: AssertionError
+        """
+        assert (
+            self.output_type == OutputType.FRAME
+            or self.output_type == OutputType.MATRIX
+        ), f"{self.operation} only supported for frames or matrices"
+
+    def _check_equal_op_type_as(self, other: "OperationNode"):
+        """Perform checks to assure operation is equal to 'other'. Used for 
rBind and cBind type equality check.
+
+        :raise: AssertionError
+        """
+        assert (
+            self.output_type == other.output_type
+        ), f"{self.operation} only supported for Nodes of equal output-type. 
Got self: {self.output_type} and other: {other.output_type}"
+
+    def _check_other(self, other: "OperationNode", expectedOutputType: 
OutputType):
+        """Perform check to assure other operation has expected output type.
+
+        :raise: AssertionError
+        """
+        assert other.output_type == expectedOutputType
 
+    
     def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode':
         return OperationNode(self.sds_context, '+', [self, other], 
shape=self.shape)
 
@@ -500,32 +549,39 @@ class OperationNode(DAGNode):
 
     def rbind(self, other) -> 'OperationNode':
         """
-        Row-wise matrix concatenation, by concatenating the second matrix as 
additional rows to the first matrix. 
-        :param: The other matrix to bind to the right hand side
-        :return: The OperationNode containing the concatenated matrices.
+        Row-wise matrix/frame concatenation, by concatenating the second 
matrix/frame as additional rows to the first matrix/frame. 
+        :param: The other matrix/frame to bind to the right hand side
+        :return: The OperationNode containing the concatenated matrices/frames.
         """
-
-        self._check_matrix_op()
-        other._check_matrix_op()
-
+        self._check_equal_op_type_as(other)
+        self._check_matrix_or_frame_op()
         if self.shape[1] != other.shape[1]:
             raise ValueError(
-                "The input matrices to rbind does not have the same number of 
columns")
-
-        return OperationNode(self.sds_context, 'rbind', [self, other], 
shape=(self.shape[0] + other.shape[0], self.shape[1]))
+                "The inputs to rbind do not have the same number of columns"
+            )
+        return OperationNode(
+            self.sds_context,
+            "rbind",
+            [self, other],
+            shape=(self.shape[0] + other.shape[0], self.shape[1]),
+            output_type=self._output_type,
+        )
 
     def cbind(self, other) -> 'OperationNode':
         """
-        Column-wise matrix concatenation, by concatenating the second matrix 
as additional columns to the first matrix. 
-        :param: The other matrix to bind to the right hand side.
-        :return: The OperationNode containing the concatenated matrices.
+        Column-wise matrix/frame concatenation, by concatenating the second 
matrix/frame as additional columns to the first matrix/frame. 
+        :param: The other matrix/frame to bind to the right hand side.
+        :return: The OperationNode containing the concatenated matrices/frames.
         """
-
-        self._check_matrix_op()
-        other._check_matrix_op()
-
+        self._check_equal_op_type_as(other)
+        self._check_matrix_or_frame_op()
         if self.shape[0] != other.shape[0]:
-            raise ValueError(
-                "The input matrices to cbind does not have the same number of 
columns")
+            raise ValueError("The inputs to cbind do not have the same number 
of rows")
+        return OperationNode(
+            self.sds_context,
+            "cbind",
+            [self, other],
+            shape=(self.shape[0], self.shape[1] + other.shape[1],),
+            output_type=self._output_type,
+        )
 
-        return OperationNode(self.sds_context, 'cbind', [self, other], 
shape=(self.shape[0], self.shape[1] + other.shape[1]))
diff --git a/src/main/python/systemds/utils/converters.py 
b/src/main/python/systemds/utils/converters.py
index 3079a55..61a5957 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -20,7 +20,8 @@
 # -------------------------------------------------------------
 
 import numpy as np
-from py4j.java_gateway import JavaClass, JavaObject, JVMView
+import pandas as pd
+from py4j.java_gateway import JavaClass, JavaObject, JVMView, JavaGateway
 
 def numpy_to_matrix_block(sds: 'SystemDSContext', np_arr: np.array):
     """Converts a given numpy array, to internal matrix block representation.
@@ -69,5 +70,73 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
     num_ros = mb.getNumRows()
     num_cols = mb.getNumColumns()
     buf = 
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertMBtoPy4JDenseArr(
-        mb)
-    return np.frombuffer(buf, count=num_ros * num_cols, 
dtype=np.float64).reshape((num_ros, num_cols))
+        mb
+    )
+    return np.frombuffer(buf, count=num_ros * num_cols, 
dtype=np.float64).reshape(
+        (num_ros, num_cols)
+    )
+
+
+def pandas_to_frame_block(sds: "SystemDSContext", pd_df: pd.DataFrame):
+    """Converts a given numpy array, to internal matrix block representation.
+
+    :param sds: The current systemds context.
+    :param np_arr: the numpy array to convert to matrixblock.
+    """
+    assert pd_df.ndim <= 2, "pd_df invalid, because it has more than 2 
dimensions"
+    rows = pd_df.shape[0]
+    cols = pd_df.shape[1]
+
+    jvm: JVMView = sds.java_gateway.jvm
+    java_gate: JavaGateway = sds.java_gateway
+
+    # pandas type mapping to systemds Valuetypes
+    data_type_mapping = {
+        np.dtype(np.object_): 
jvm.org.apache.sysds.common.Types.ValueType.STRING,
+        np.dtype(np.int64): jvm.org.apache.sysds.common.Types.ValueType.INT64,
+        np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
+        np.dtype(np.bool_): 
jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
+        np.dtype("<M8[ns]"): 
jvm.org.apache.sysds.common.Types.ValueType.STRING,
+    }
+    schema = []
+    col_names = []
+
+    for col_name, dtype in dict(pd_df.dtypes).items():
+        col_names.append(col_name)
+        if dtype in data_type_mapping.keys():
+            schema.append(data_type_mapping[dtype])
+        else:
+            schema.append(jvm.org.apache.sysds.common.Types.ValueType.STRING)
+    try:
+        jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
+        jc_String = jvm.java.lang.String
+        jc_FrameBlock = jvm.org.apache.sysds.runtime.matrix.data.FrameBlock
+        j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
+        j_colNameArray = java_gate.new_array(jc_String, len(col_names))
+        j_dataArray = java_gate.new_array(jc_String, rows, cols)
+        for i in range(len(schema)):
+            j_valueTypeArray[i] = schema[i]
+        for i in range(len(col_names)):
+            j_colNameArray[i] = col_names[i]
+        j = 0
+        for j, col_name in enumerate(col_names):
+            col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
+            for i in range(col_data.shape[0]):
+                if col_data[i]:
+                    j_dataArray[i][j] = col_data[i]
+        fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
+
+        return fb
+    except Exception as e:
+        sds.exception_and_close(e)
+
+
+def frame_block_to_pandas(sds: "SystemDSContext", fb: JavaObject):
+    num_rows = fb.getNumRows()
+    num_cols = fb.getNumColumns()
+    df = pd.DataFrame()
+    for c_index in range(num_cols):
+        col_data = fb.getColumnData(c_index)
+        df[fb.getColumnName(c_index)] = np.array(col_data[:num_rows])
+
+    return df
diff --git a/src/main/python/tests/frame/data/homes.csv 
b/src/main/python/tests/frame/data/homes.csv
new file mode 100644
index 0000000..a8d6fab
--- /dev/null
+++ b/src/main/python/tests/frame/data/homes.csv
@@ -0,0 +1,21 @@
+zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
+95141,west,1373,7,1,3,FALSE,695,698
+91312,south,3261,6,2,2,FALSE,902,906
+94555,north,1835,3,3,3,TRUE,888,892
+95141,east,2833,6,2.5,2,TRUE,927,932
+96334,south,2742,6,2.5,2,FALSE,872,876
+96334,north,2195,5,2.5,2,FALSE,799,803
+98755,north,3469,7,2.5,2,FALSE,958,963
+96334,west,1685,7,1.5,2,TRUE,757,760
+95141,west,2238,4,3,3,FALSE,894,899
+91312,west,1245,4,1,1,FALSE,547,549
+98755,south,3702,7,3,1,FALSE,959,964
+98755,north,1865,7,1,2,TRUE,742,745
+94555,north,3837,3,1,1,FALSE,839,842
+91312,west,2139,3,1,3,TRUE,820,824
+95141,north,3824,4,3,1,FALSE,954,958
+98755,east,2858,5,1.5,1,FALSE,759,762
+91312,south,1827,7,3,1,FALSE,735,738
+91312,south,3557,2,2.5,1,FALSE,888,892
+91312,south,2553,2,2.5,2,TRUE,884,889
+96334,west,1682,3,1.5,1,FALSE,625,628
\ No newline at end of file
diff --git a/src/main/python/tests/frame/data/homes.tfspec_bin2.json 
b/src/main/python/tests/frame/data/homes.tfspec_bin2.json
new file mode 100644
index 0000000..910593a
--- /dev/null
+++ b/src/main/python/tests/frame/data/homes.tfspec_bin2.json
@@ -0,0 +1,6 @@
+{
+    "recode": ["zipcode", "district", "view"],
+    "bin": [
+        { "name": "saleprice", "method": "equi-width", "numbins": 3 }, { 
"name": "sqft", "method": "equi-width", "numbins": 4 }
+    ]
+}
\ No newline at end of file
diff --git a/src/main/python/tests/frame/data/homes.tfspec_recode2.json 
b/src/main/python/tests/frame/data/homes.tfspec_recode2.json
new file mode 100644
index 0000000..394dfc7
--- /dev/null
+++ b/src/main/python/tests/frame/data/homes.tfspec_recode2.json
@@ -0,0 +1,3 @@
+{
+    "recode": ["zipcode", "district", "view"]
+}
\ No newline at end of file
diff --git a/src/main/python/tests/frame/test_hyperband.py 
b/src/main/python/tests/frame/test_hyperband.py
new file mode 100644
index 0000000..b470ed1
--- /dev/null
+++ b/src/main/python/tests/frame/test_hyperband.py
@@ -0,0 +1,86 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import os
+import shutil
+import sys
+import unittest
+
+import pandas as pd
+import numpy as np
+from systemds.context import SystemDSContext
+from systemds.frame import Frame
+from systemds.matrix import Matrix
+from systemds.operator.algorithm import hyperband
+
+
+
+class TestHyperband(unittest.TestCase):
+
+    sds: SystemDSContext = None
+    np.random.seed(42)
+    X_train = np.random.rand(50, 10)
+    y_train = np.sum(X_train, axis=1, keepdims=True) + np.random.rand(50, 1)
+    X_val = np.random.rand(50, 10)
+    y_val = np.sum(X_val, axis=1, keepdims=True) + np.random.rand(50, 1)
+    params = 'list("reg", "tol", "maxi")'
+    min_max_params = [[0, 20],[0.0001, 0.1],[5, 10]]
+    param_ranges = np.array(min_max_params)
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def tearDown(self):
+        pass
+
+    def test_hyperband(self):
+        x_train = Matrix(self.sds, self.X_train)
+        y_train = Matrix(self.sds, self.y_train)
+        x_val = Matrix(self.sds, self.X_val)
+        y_val = Matrix(self.sds, self.y_val)
+        paramRanges = Matrix(self.sds, self.param_ranges)
+        params = self.params
+        [best_weights_mat, opt_hyper_params_df] = hyperband(
+            X_train=x_train,
+            y_train=y_train,
+            X_val=x_val,
+            y_val=y_val,
+            params=params,
+            paramRanges=paramRanges,
+        ).compute()
+        self.assertTrue(isinstance(best_weights_mat, np.ndarray))
+        self.assertTrue(best_weights_mat.shape[0] == self.X_train.shape[1])
+        self.assertTrue(best_weights_mat.shape[1] == self.y_train.shape[1])
+        
+        self.assertTrue(isinstance(opt_hyper_params_df, pd.DataFrame))
+        self.assertTrue(opt_hyper_params_df.shape[0] == paramRanges.shape[0])
+        self.assertTrue(opt_hyper_params_df.shape[1] == 1)
+        for i, hyper_param in 
enumerate(opt_hyper_params_df.values.flatten().tolist()):
+            self.assertTrue(self.min_max_params[i][0] <= hyper_param <= 
self.min_max_params[i][1])
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/frame/test_r_c_bind.py 
b/src/main/python/tests/frame/test_r_c_bind.py
new file mode 100644
index 0000000..c16b4bf
--- /dev/null
+++ b/src/main/python/tests/frame/test_r_c_bind.py
@@ -0,0 +1,130 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import pandas as pd
+from systemds.context import SystemDSContext
+from systemds.frame import Frame
+
+
+class TestRCBind(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    # shape (2, 3)
+    df_cb_1 = pd.DataFrame(
+        {"col1": ["col1_hello", "col1_world"], "col2": [0, 1], "col3": [0.0, 
0.1]}
+    )
+    # shape (2, 2)
+    df_cb_2 = pd.DataFrame({"col4": ["col4_hello", "col4_world"], "col5": [0, 
1]})
+    df_cb_3 = pd.DataFrame({"col6": ["col6_hello", "col6_world"], "col7": [0, 
1]})
+
+
+    #shape (2, 3)
+    df_rb_1 = pd.DataFrame(
+        {"col1": ["col1_hello_1", "col1_world_1"], "col2": [0, 1], "col3": 
[0.0, 0.1]}
+    )
+    #shape (4, 3)
+    df_rb_2 = pd.DataFrame(
+        {
+            "col1": ["col1_hello_2", "col1_world_2", "col1_hello_2", 
"col1_world_2"],
+            "col2": [2, 3, 4, 5],
+            "col3": [0.2, 0.3, 0.4, 0.5],
+        }
+    )
+    #shape (3, 3)
+    df_rb_3 = pd.DataFrame(
+        {
+            "col1": ["col1_hello_3", "col1_world_3", "col1_hello_3"],
+            "col2": [6, 7, 8],
+            "col3": [0.6, 0.7, 0.8],
+        }
+    )
+
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_r_bind_pair(self):
+        f1 = Frame(self.sds, self.df_rb_1)
+        f2 = Frame(self.sds, self.df_rb_2)
+        result_df = f1.rbind(f2).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df = pd.concat([self.df_rb_1, self.df_rb_2], ignore_index=True)
+        self.assertTrue(target_df.equals(result_df))
+
+    def test_r_bind_triple(self):
+        f1 = Frame(self.sds, self.df_rb_1)
+        f2 = Frame(self.sds, self.df_rb_2)
+        f3 = Frame(self.sds, self.df_rb_3)
+        result_df = f1.rbind(f2).rbind(f3).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df = pd.concat([self.df_rb_1, self.df_rb_2, self.df_rb_3], 
ignore_index=True)
+        self.assertTrue(target_df.equals(result_df))
+    
+    def test_r_bind_triple_twostep(self):
+        f1 = Frame(self.sds, self.df_rb_1)
+        f2 = Frame(self.sds, self.df_rb_2)
+        f3 = Frame(self.sds, self.df_rb_3)
+        tmp_df = f1.rbind(f2).compute()
+        result_df = Frame(self.sds, tmp_df).rbind(f3).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df = pd.concat([self.df_rb_1, self.df_rb_2, self.df_rb_3], 
ignore_index=True)
+        self.assertTrue(target_df.equals(result_df))
+    
+    def test_c_bind_pair(self):
+        f1 = Frame(self.sds, self.df_cb_1)
+        f2 = Frame(self.sds, self.df_cb_2)
+        result_df = f1.cbind(f2).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df= pd.concat([self.df_cb_1, self.df_cb_2], axis=1)
+        self.assertTrue(target_df.equals(result_df))
+
+    def test_c_bind_triple(self):
+        f1 = Frame(self.sds, self.df_cb_1)
+        f2 = Frame(self.sds, self.df_cb_2)
+        f3 = Frame(self.sds, self.df_cb_3)
+        result_df = f1.cbind(f2).cbind(f3).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df = pd.concat([self.df_cb_1, self.df_cb_2, self.df_cb_3], 
axis=1)
+        self.assertTrue(target_df.equals(result_df))
+
+    def test_c_bind_triple_twostep(self):
+        f1 = Frame(self.sds, self.df_cb_1)
+        f2 = Frame(self.sds, self.df_cb_2)
+        f3 = Frame(self.sds, self.df_cb_3)
+        tmp_df = f1.cbind(f2).compute()
+        result_df = Frame(self.sds, tmp_df).cbind(f3).compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        target_df = pd.concat([self.df_cb_1, self.df_cb_2, self.df_cb_3], 
axis=1)
+        self.assertTrue(target_df.equals(result_df))
+
+    
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/frame/test_transform_apply.py 
b/src/main/python/tests/frame/test_transform_apply.py
new file mode 100644
index 0000000..8b41efa
--- /dev/null
+++ b/src/main/python/tests/frame/test_transform_apply.py
@@ -0,0 +1,86 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import os
+import shutil
+import sys
+import unittest
+
+import pandas as pd
+import numpy as np
+import json
+from systemds.context import SystemDSContext
+from systemds.frame import Frame
+from systemds.matrix import Matrix
+
+
+class TestTransformApply(unittest.TestCase):
+
+    sds: SystemDSContext = None
+    HOMES_PATH = "tests/frame/data/homes.csv"
+    HOMES_SCHEMA = '"int,string,int,int,double,int,boolean,int,int"'
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def tearDown(self):
+        pass
+
+    def test_apply_recode_bin(self):
+        JSPEC_PATH = "tests/frame/data/homes.tfspec_bin2.json"
+        with open(JSPEC_PATH) as jspec_file:
+            JSPEC = json.load(jspec_file)
+        F1 = self.sds.read(
+            self.HOMES_PATH,
+            data_type="frame",
+            schema=self.HOMES_SCHEMA,
+            format="csv",
+            header=True,
+        )
+        pd_F1 = F1.compute()
+        jspec = self.sds.read(JSPEC_PATH, data_type="scalar", 
value_type="string")
+        X, M = F1.transform_encode(spec=jspec).compute()
+        self.assertTrue(isinstance(X, np.ndarray))
+        self.assertTrue(isinstance(M, pd.DataFrame))
+        self.assertTrue(X.shape == pd_F1.shape)
+        self.assertTrue(np.all(np.isreal(X)))
+        relevant_columns = set()
+        for col_name in JSPEC["recode"]:
+            relevant_columns.add(pd_F1.columns.get_loc(col_name))
+            self.assertTrue(M[col_name].nunique() == pd_F1[col_name].nunique())
+        for binning in JSPEC["bin"]:
+            col_name = binning["name"]
+            relevant_columns.add(pd_F1.columns.get_loc(col_name))
+            self.assertTrue(M[col_name].nunique() == binning["numbins"])
+
+        X2 = F1.transform_apply(spec=jspec, meta=Frame(self.sds, M)).compute()
+        self.assertTrue(X.shape == X2.shape)
+        self.assertTrue(np.all(np.isreal(X2)))
+
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/frame/test_transform_encode.py 
b/src/main/python/tests/frame/test_transform_encode.py
new file mode 100644
index 0000000..4e88190
--- /dev/null
+++ b/src/main/python/tests/frame/test_transform_encode.py
@@ -0,0 +1,75 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import os
+import shutil
+import sys
+import unittest
+
+import pandas as pd
+import numpy as np
+import json
+from systemds.context import SystemDSContext
+from systemds.frame import Frame
+from systemds.matrix import Matrix
+
+
+class TestTransformEncode(unittest.TestCase):
+
+    sds: SystemDSContext = None
+    HOMES_PATH = "tests/frame/data/homes.csv"
+    HOMES_SCHEMA = '"int,string,int,int,double,int,boolean,int,int"'
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def tearDown(self):
+        pass
+
+    def test_encode_recode(self):
+        JSPEC_PATH = "tests/frame/data/homes.tfspec_recode2.json"
+        with open(JSPEC_PATH) as jspec_file:
+            JSPEC = json.load(jspec_file)
+        F1 = self.sds.read(
+            self.HOMES_PATH,
+            data_type="frame",
+            schema=self.HOMES_SCHEMA,
+            format="csv",
+            header=True,
+        )
+        pd_F1 = F1.compute()
+        jspec = self.sds.read(JSPEC_PATH, data_type="scalar", 
value_type="string")
+        X, M = F1.transform_encode(spec=jspec).compute()
+        self.assertTrue(isinstance(X, np.ndarray))
+        self.assertTrue(isinstance(M, pd.DataFrame))
+        self.assertTrue(X.shape == pd_F1.shape)
+        self.assertTrue(np.all(np.isreal(X)))
+        for col_name in JSPEC["recode"]:
+            self.assertTrue(M[col_name].nunique() == pd_F1[col_name].nunique())
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/frame/test_write_read.py 
b/src/main/python/tests/frame/test_write_read.py
new file mode 100644
index 0000000..712efb9
--- /dev/null
+++ b/src/main/python/tests/frame/test_write_read.py
@@ -0,0 +1,74 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import os
+import shutil
+import sys
+import unittest
+
+import pandas as pd
+from systemds.context import SystemDSContext
+from systemds.frame import Frame
+
+
+class TestWriteRead(unittest.TestCase):
+
+    sds: SystemDSContext = None
+    temp_dir: str = "tests/frame/temp_write/"
+    n_cols = 3
+    n_rows = 100
+    df = pd.DataFrame(
+        {
+            "col1": [f"col1_string_{i}" for i in range(n_rows)],
+            "col2": [i for i in range(n_rows)],
+            "col3": [i * 0.1 for i in range(n_rows)],
+        }
+    )
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_write_read_binary(self):
+        frame = Frame(self.sds, self.df)
+        frame.write(self.temp_dir + "01").compute()
+        NX = self.sds.read(self.temp_dir + "01", data_type="frame")
+        result_df = NX.compute()
+        self.assertTrue((self.df.values == result_df.values).all())
+
+    def test_write_read_csv(self):
+        frame = Frame(self.sds, self.df)
+        frame.write(self.temp_dir + "02", header=True, format="csv").compute()
+        NX = self.sds.read(self.temp_dir + "02", data_type="frame", 
format="csv")
+        result_df = NX.compute()
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        self.assertTrue(self.df.equals(result_df))
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)

Reply via email to